Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tx lower bound #498

Merged
merged 2 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class EthereumLowerBoundService(
return listOf(
EthereumLowerBoundStateDetector(upstream),
EthereumLowerBoundBlockDetector(upstream),
EthereumLowerBoundTxDetector(upstream),
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.emeraldpay.dshackle.upstream.lowerbound.detector.RecursiveLowerBound
import io.emeraldpay.dshackle.upstream.lowerbound.toHex
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import reactor.core.publisher.Flux

const val MAX_OFFSET = 20

class EthereumLowerBoundTxDetector(
private val upstream: Upstream,
) : LowerBoundDetector() {
private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.TX, setOf("No tx data"), lowerBounds)

override fun period(): Long {
return 3
}

override fun internalDetectLowerBound(): Flux<LowerBoundData> {
return recursiveLowerBound.recursiveDetectLowerBoundWithOffset(MAX_OFFSET) { block ->
upstream.getIngressReader()
.read(
ChainRequest(
"eth_getBlockTransactionCountByNumber",
ListParams(block.toHex()),
),
)
.doOnNext {
if (it.hasResult() && (it.getResult().contentEquals("null".toByteArray()) || it.getResultAsProcessedString().substring(2).toLong(16) == 0L)) {
throw IllegalStateException("No tx data")
}
}
}
}

override fun types(): Set<LowerBoundType> {
return setOf(LowerBoundType.TX)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import reactor.kotlin.core.publisher.toFlux
import reactor.util.retry.Retry
import reactor.util.retry.RetryBackoffSpec
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger

class RecursiveLowerBound(
private val upstream: Upstream,
Expand All @@ -21,18 +22,7 @@ class RecursiveLowerBound(
private val log = LoggerFactory.getLogger(this::class.java)

fun recursiveDetectLowerBound(hasData: (Long) -> Mono<ChainResponse>): Flux<LowerBoundData> {
return Mono.just(upstream.getHead())
.flatMap {
val currentHeight = it.getCurrentHeight()
if (currentHeight == null) {
Mono.empty()
} else if (!lowerBounds.contains(type)) {
Mono.just(LowerBoundBinarySearch(0, currentHeight))
} else {
// next calculations will be carried out only within the last range
Mono.just(LowerBoundBinarySearch(lowerBounds[type]!!.lowerBound, currentHeight))
}
}
return initialRange()
.expand { data ->
if (data.found) {
Mono.empty()
Expand All @@ -41,22 +31,106 @@ class RecursiveLowerBound(

if (data.left > data.right) {
val current = if (data.current == 0L) 1 else data.current
Mono.just(LowerBoundBinarySearch(current, true))
Mono.just(LowerBoundBinarySearchData(current, true))
} else {
hasData(middle)
.retryWhen(retrySpec(nonRetryableErrors))
.retryWhen(retrySpec(middle, nonRetryableErrors))
.flatMap(ChainResponse::requireResult)
.map { LowerBoundBinarySearchData(data.left, middle - 1, middle) }
.onErrorReturn(
LowerBoundBinarySearchData(
middle + 1,
data.right,
data.current,
),
)
}
}
}
.filter { it.found }
.next()
.map {
LowerBoundData(it.current, type)
}.toFlux()
}

fun recursiveDetectLowerBoundWithOffset(maxLimit: Int, hasData: (Long) -> Mono<ChainResponse>): Flux<LowerBoundData> {
val visitedBlocks = HashSet<Long>()
return Mono.justOrEmpty(lowerBounds[type]?.lowerBound)
.flatMapMany {
// at first, we try to check the current bound to prevent huge calculations
hasData(it!!)
.retryWhen(retrySpec(it, nonRetryableErrors))
.flatMap(ChainResponse::requireResult)
.map { LowerBoundData(lowerBounds[type]!!.lowerBound, type) }
.onErrorResume { Mono.empty() }
}.switchIfEmpty(
initialRange()
.expand { data ->
if (data.found) {
Mono.empty()
} else {
val middle = middleBlock(data)

if (data.left > data.right) {
val current = if (data.current == 0L) 1 else data.current
Mono.just(LowerBoundBinarySearchData(current, true))
} else {
hasData(middle)
.retryWhen(retrySpec(middle, nonRetryableErrors))
.flatMap(ChainResponse::requireResult)
.map { LowerBoundBinarySearchData(data.left, middle - 1, middle) }
.onErrorResume {
if (middle < 0) {
Mono.just(LowerBoundBinarySearchData(middle + 1, data.right, data.current))
} else {
shiftLeftAndSearch(data, middle, visitedBlocks, maxLimit, hasData)
}
}
}
}
}
.filter { it.found }
.next()
.map {
LowerBoundData(it.current, type)
}.toFlux(),
)
}

private fun shiftLeftAndSearch(
currentData: LowerBoundBinarySearchData,
currentMiddle: Long,
visitedBlocks: HashSet<Long>,
maxLimit: Int,
hasData: (Long) -> Mono<ChainResponse>,
): Mono<LowerBoundBinarySearchData> {
val count = AtomicInteger(0)
return Mono.just(LowerBoundBinarySearchData(currentMiddle - 1, false))
.expand { currentBlock ->
if (currentBlock.found) {
// to avoid extra handling
Mono.empty()
} else {
if (visitedBlocks.contains(currentBlock.current) || currentBlock.current < 0) {
// if this block has been already seen there is no need to check it again
Mono.just(LowerBoundBinarySearchData(currentMiddle + 1, currentData.right, currentData.current, true))
} else {
hasData(currentBlock.current)
.retryWhen(retrySpec(currentBlock.current, nonRetryableErrors))
.flatMap(ChainResponse::requireResult)
.map { true }
.onErrorReturn(false)
.map {
if (it) {
LowerBoundBinarySearch(data.left, middle - 1, middle)
// we found data at once and return it
LowerBoundBinarySearchData(currentData.left, currentBlock.current - 1, currentBlock.current, true)
}
.onErrorResume {
// otherwise we go the left until we reach the specified limit
count.incrementAndGet()
if (count.get() in 1..maxLimit) {
visitedBlocks.add(currentBlock.current)
Mono.just(LowerBoundBinarySearchData(currentBlock.current - 1, false))
} else {
LowerBoundBinarySearch(
middle + 1,
data.right,
data.current,
)
Mono.just(LowerBoundBinarySearchData(currentMiddle + 1, currentData.right, currentData.current, true))
}
}
}
Expand All @@ -65,11 +139,27 @@ class RecursiveLowerBound(
.filter { it.found }
.next()
.map {
LowerBoundData(it.current, type)
}.toFlux()
// in terms of the whole calculation we haven't found the bound
LowerBoundBinarySearchData(it.left, it.right, it.current)
}
}

private fun initialRange(): Mono<LowerBoundBinarySearchData> {
return Mono.just(upstream.getHead())
.flatMap {
val currentHeight = it.getCurrentHeight()
if (currentHeight == null) {
Mono.empty()
} else if (!lowerBounds.contains(type)) {
Mono.just(LowerBoundBinarySearchData(0, currentHeight))
} else {
// next calculations will be carried out only within the last range
Mono.just(LowerBoundBinarySearchData(lowerBounds[type]!!.lowerBound, currentHeight))
}
}
}

private fun retrySpec(nonRetryableErrors: Set<String>): RetryBackoffSpec {
private fun retrySpec(block: Long, nonRetryableErrors: Set<String>): RetryBackoffSpec {
return Retry.backoff(
Long.MAX_VALUE,
Duration.ofSeconds(1),
Expand All @@ -80,18 +170,20 @@ class RecursiveLowerBound(
}
.doAfterRetry {
log.debug(
"Error in calculation of lower block of upstream {}, retry attempt - {}, message - {}",
"Error in calculation of lower block {} of upstream {}, type - {}, retry attempt - {}, message - {}",
block,
upstream.getId(),
type,
it.totalRetries(),
it.failure().message,
)
}
}

private fun middleBlock(lowerBoundBinarySearch: LowerBoundBinarySearch): Long =
lowerBoundBinarySearch.left + (lowerBoundBinarySearch.right - lowerBoundBinarySearch.left) / 2
private fun middleBlock(lowerBoundBinarySearchData: LowerBoundBinarySearchData): Long =
lowerBoundBinarySearchData.left + (lowerBoundBinarySearchData.right - lowerBoundBinarySearchData.left) / 2

private data class LowerBoundBinarySearch(
private data class LowerBoundBinarySearchData(
val left: Long,
val right: Long,
val current: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundService
import io.emeraldpay.dshackle.upstream.ethereum.MAX_OFFSET
import io.emeraldpay.dshackle.upstream.ethereum.ZERO_ADDRESS
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService
Expand Down Expand Up @@ -45,13 +46,21 @@ class RecursiveLowerBoundServiceTest {
on {
read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false)))
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
on {
read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(it.toHex())))
} doReturn Mono.just(ChainResponse("\"0x12\"".toByteArray(), null))
} else {
on {
read(ChainRequest("eth_getBalance", ListParams(ZERO_ADDRESS, it.toHex())))
} doReturn Mono.error(RuntimeException("missing trie node"))
on {
read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false)))
} doReturn Mono.error(RuntimeException("No block data"))
for (block in it downTo it - MAX_OFFSET - 1) {
on {
read(ChainRequest("eth_getBlockTransactionCountByNumber", ListParams(block.toHex())))
} doReturn Mono.error(RuntimeException("No tx data"))
}
}
}
}
Expand All @@ -67,6 +76,7 @@ class RecursiveLowerBoundServiceTest {
.expectNoEvent(Duration.ofSeconds(15))
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.STATE }
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.BLOCK }
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.TX }
.thenCancel()
.verify(Duration.ofSeconds(3))

Expand All @@ -76,6 +86,7 @@ class RecursiveLowerBoundServiceTest {
listOf(
LowerBoundData(17964844L, LowerBoundType.STATE),
LowerBoundData(17964844L, LowerBoundType.BLOCK),
LowerBoundData(17964844L, LowerBoundType.TX),
),
)
}
Expand Down
Loading