Skip to content

Commit

Permalink
Lower bound for block data (#453)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Apr 17, 2024
1 parent cbfb77d commit c377083
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 61 deletions.
2 changes: 1 addition & 1 deletion emerald-grpc
1 change: 1 addition & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class StreamHead(
LowerBoundType.SLOT -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_SLOT
LowerBoundType.UNKNOWN -> BlockchainOuterClass.LowerBoundType.UNRECOGNIZED
LowerBoundType.STATE -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_STATE
LowerBoundType.BLOCK -> BlockchainOuterClass.LowerBoundType.LOWER_BOUND_BLOCK
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.emeraldpay.dshackle.upstream.lowerbound.detector.RecursiveLowerBound
import io.emeraldpay.dshackle.upstream.lowerbound.toHex
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

class EthereumLowerBoundBlockDetector(
private val upstream: Upstream,
) : LowerBoundDetector() {
private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.BLOCK, setOf("No block data"))

override fun period(): Long {
return 5
}

override fun internalDetectLowerBound(): Flux<LowerBoundData> {
return recursiveLowerBound.recursiveDetectLowerBound { block ->
if (block == 0L) {
Mono.just(ChainResponse(ByteArray(0), null))
} else {
upstream.getIngressReader()
.read(
ChainRequest(
"eth_getBlockByNumber",
ListParams(block.toHex(), false),
),
)
.doOnNext {
if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) {
throw IllegalStateException("No block data")
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class EthereumLowerBoundService(
private val upstream: Upstream,
) : LowerBoundService(chain, upstream) {
override fun detectors(): List<LowerBoundDetector> {
return listOf(EthereumLowerBoundStateDetector(upstream))
return listOf(
EthereumLowerBoundStateDetector(upstream),
EthereumLowerBoundBlockDetector(upstream),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ data class LowerBoundData(
}

enum class LowerBoundType {
UNKNOWN, STATE, SLOT
UNKNOWN, STATE, SLOT, BLOCK
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.emeraldpay.dshackle.upstream.lowerbound.toHex
import io.emeraldpay.dshackle.upstream.polkadot.PolkadotLowerBoundService
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
Expand All @@ -22,22 +23,93 @@ import reactor.test.StepVerifier
import java.time.Duration

class RecursiveLowerBoundServiceTest {
private val blocks = listOf(
9000000L, 13500000L, 15750000L, 16875000L, 17437500L, 17718750L, 17859375L, 17929688L, 17964844L, 17947266L,
17956055L, 17960449L, 17962646L, 17963745L, 17964294L, 17964569L, 17964706L, 17964775L, 17964809L, 17964826L,
17964835L, 17964839L, 17964841L, 17964842L, 17964843L,
)
private val hash1 = "0x1b1a5dd69e12aa12e2b9197be0d0cceef3dde6368ea6376ad7c8b06488c9cf6a"
private val hash2 = "0x1b1a5dd69e12aa12e2b9197be0d0cceef3dde6368ea6376ad7c8b06488c9cf7a"

@ParameterizedTest
@MethodSource("detectors")
fun `find lower block closer to the height`(
reader: ChainReader,
detectorClass: Class<LowerBoundService>,
) {
@Test
fun `find lower data for eth`() {
val head = mock<Head> {
on { getCurrentHeight() } doReturn 18000000
}
val reader = mock<ChainReader> {
blocks.forEach {
if (it == 17964844L) {
on {
read(ChainRequest("eth_getBalance", ListParams(ZERO_ADDRESS, it.toHex())))
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
on {
read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false)))
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
} else {
on {
read(ChainRequest("eth_getBalance", ListParams(ZERO_ADDRESS, it.toHex())))
} doReturn Mono.error(RuntimeException("missing trie node"))
on {
read(ChainRequest("eth_getBlockByNumber", ListParams(it.toHex(), false)))
} doReturn Mono.error(RuntimeException("No block data"))
}
}
}
val upstream = mock<Upstream> {
on { getHead() } doReturn head
on { getIngressReader() } doReturn reader
}

val detector = detectorClass.getConstructor(Chain::class.java, Upstream::class.java).newInstance(Chain.UNSPECIFIED, upstream)
val detector = EthereumLowerBoundService(Chain.UNSPECIFIED, upstream)

StepVerifier.withVirtualTime { detector.detectLowerBounds() }
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(15))
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.STATE }
.expectNextMatches { it.lowerBound == 17964844L && it.type == LowerBoundType.BLOCK }
.thenCancel()
.verify(Duration.ofSeconds(3))

assertThat(detector.getLowerBounds().toList())
.usingRecursiveFieldByFieldElementComparatorIgnoringFields("timestamp")
.hasSameElementsAs(
listOf(
LowerBoundData(17964844L, LowerBoundType.STATE),
LowerBoundData(17964844L, LowerBoundType.BLOCK),
),
)
}

@Test
fun `find lower data for polka`() {
val head = mock<Head> {
on { getCurrentHeight() } doReturn 18000000
}
val reader = mock<ChainReader> {
blocks.forEach {
if (it == 17964844L) {
on {
read(ChainRequest("chain_getBlockHash", ListParams(it.toHex())))
} doReturn Mono.just(ChainResponse("\"$hash1\"".toByteArray(), null))
on {
read(ChainRequest("state_getMetadata", ListParams(hash1)))
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
} else {
on {
read(ChainRequest("chain_getBlockHash", ListParams(it.toHex())))
} doReturn Mono.just(ChainResponse("\"$hash2\"".toByteArray(), null))
on {
read(ChainRequest("state_getMetadata", ListParams(hash2)))
} doReturn Mono.error(RuntimeException("State already discarded for"))
}
}
}
val upstream = mock<Upstream> {
on { getHead() } doReturn head
on { getIngressReader() } doReturn reader
}

val detector = PolkadotLowerBoundService(Chain.UNSPECIFIED, upstream)

StepVerifier.withVirtualTime { detector.detectLowerBounds() }
.expectSubscription()
Expand All @@ -49,7 +121,9 @@ class RecursiveLowerBoundServiceTest {
assertThat(detector.getLowerBounds().toList())
.usingRecursiveFieldByFieldElementComparatorIgnoringFields("timestamp")
.hasSameElementsAs(
listOf(LowerBoundData(17964844L, LowerBoundType.STATE)),
listOf(
LowerBoundData(17964844L, LowerBoundType.STATE),
),
)
}

Expand Down Expand Up @@ -84,56 +158,6 @@ class RecursiveLowerBoundServiceTest {
}

companion object {
private val blocks = listOf(
9000000L, 13500000L, 15750000L, 16875000L, 17437500L, 17718750L, 17859375L, 17929688L, 17964844L, 17947266L,
17956055L, 17960449L, 17962646L, 17963745L, 17964294L, 17964569L, 17964706L, 17964775L, 17964809L, 17964826L,
17964835L, 17964839L, 17964841L, 17964842L, 17964843L,
)
private const val hash1 = "0x1b1a5dd69e12aa12e2b9197be0d0cceef3dde6368ea6376ad7c8b06488c9cf6a"
private const val hash2 = "0x1b1a5dd69e12aa12e2b9197be0d0cceef3dde6368ea6376ad7c8b06488c9cf7a"

@JvmStatic
fun detectors(): List<Arguments> = listOf(
Arguments.of(
mock<ChainReader> {
blocks.forEach {
if (it == 17964844L) {
on {
read(ChainRequest("eth_getBalance", ListParams(ZERO_ADDRESS, it.toHex())))
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
} else {
on {
read(ChainRequest("eth_getBalance", ListParams(ZERO_ADDRESS, it.toHex())))
} doReturn Mono.error(RuntimeException("missing trie node"))
}
}
},
EthereumLowerBoundService::class.java,
),
Arguments.of(
mock<ChainReader> {
blocks.forEach {
if (it == 17964844L) {
on {
read(ChainRequest("chain_getBlockHash", ListParams(it.toHex())))
} doReturn Mono.just(ChainResponse("\"$hash1\"".toByteArray(), null))
on {
read(ChainRequest("state_getMetadata", ListParams(hash1)))
} doReturn Mono.just(ChainResponse(ByteArray(0), null))
} else {
on {
read(ChainRequest("chain_getBlockHash", ListParams(it.toHex())))
} doReturn Mono.just(ChainResponse("\"$hash2\"".toByteArray(), null))
on {
read(ChainRequest("state_getMetadata", ListParams(hash2)))
} doReturn Mono.error(RuntimeException("State already discarded for"))
}
}
},
PolkadotLowerBoundService::class.java,
),
)

@JvmStatic
fun detectorsFirstBlock(): List<Arguments> = listOf(
Arguments.of(
Expand Down

0 comments on commit c377083

Please sign in to comment.