Skip to content

Commit

Permalink
Pass sort in direct reader (#588)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Nov 4, 2024
1 parent 152e6c3 commit 243a11e
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 58 deletions.
8 changes: 4 additions & 4 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ open class NativeCall(
return ctx.upstream.getLocalReader()
.flatMap { api ->
SpannedReader(api, tracer, LOCAL_READER)
.read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, false, ctx.upstreamFilter.matcher))
.read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, false, ctx.upstreamFilter))
.map {
val result = it.getResult()
val resolvedUpstreamData = it.resolvedUpstreamData.ifEmpty {
Expand Down Expand Up @@ -799,16 +799,16 @@ open class NativeCall(
selector: BlockchainOuterClass.Selector?,
streamRequest: Boolean,
): ChainRequest {
return toChainRequest(nonce, selector, streamRequest, Selector.empty)
return toChainRequest(nonce, selector, streamRequest, Selector.UpstreamFilter.default)
}

fun toChainRequest(
nonce: Long?,
selector: BlockchainOuterClass.Selector?,
streamRequest: Boolean,
matcher: Selector.Matcher,
upstreamFilter: Selector.UpstreamFilter,
): ChainRequest {
return ChainRequest(method, params, nonce, selector, streamRequest, matcher)
return ChainRequest(method, params, nonce, selector, streamRequest, upstreamFilter)
}
}
}
10 changes: 5 additions & 5 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainRequest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ data class ChainRequest(
val nonce: Long?,
val selector: BlockchainOuterClass.Selector?,
val isStreamed: Boolean = false,
val matcher: Selector.Matcher = Selector.empty,
val upstreamFilter: Selector.UpstreamFilter = Selector.UpstreamFilter.default,
) {

@JvmOverloads constructor(
Expand All @@ -39,14 +39,14 @@ data class ChainRequest(
nonce: Long? = null,
selectors: BlockchainOuterClass.Selector? = null,
isStreamed: Boolean = false,
matcher: Selector.Matcher = Selector.empty,
) : this(method, params, 1, nonce, selectors, isStreamed, matcher)
upstreamFilter: Selector.UpstreamFilter = Selector.UpstreamFilter.default,
) : this(method, params, 1, nonce, selectors, isStreamed, upstreamFilter)

constructor(
method: String,
params: CallParams,
matcher: Selector.Matcher,
) : this(method, params, 1, null, null, false, matcher)
upstreamFilter: Selector.UpstreamFilter,
) : this(method, params, 1, null, null, false, upstreamFilter)

fun toJson(): ByteArray {
return params.toJson(id, method)
Expand Down
5 changes: 5 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ class Selector {
val matcher: Matcher,
) {
constructor(matcher: Matcher) : this(Sort.default, matcher)

companion object {
@JvmStatic
val default = UpstreamFilter(empty)
}
}

data class MultiMatcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ open class EthereumCachingReader(
return SpannedReader(directReader.blockByFinalizationReader, tracer, DIRECT_QUORUM_RPC_READER)
}

open fun blocksByIdAsCont(matcher: Selector.Matcher): Reader<BlockId, Result<BlockContainer>> {
val idToBlockHash = Function<BlockId, Request<BlockHash>> { id -> Request(BlockHash.from(id.value), matcher) }
open fun blocksByIdAsCont(upstreamFilter: Selector.UpstreamFilter): Reader<BlockId, Result<BlockContainer>> {
val idToBlockHash = Function<BlockId, Request<BlockHash>> { id -> Request(BlockHash.from(id.value), upstreamFilter) }
return CompoundReader(
SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHash()), tracer, CACHE_BLOCK_BY_HASH_READER),
SpannedReader(RekeyingReader(idToBlockHash, directReader.blockReader), tracer, DIRECT_QUORUM_RPC_READER),
)
}

open fun blocksByHeightAsCont(matcher: Selector.Matcher): Reader<Long, Result<BlockContainer>> {
val numToRequest = Function<Long, Request<Long>> { num -> Request(num, matcher) }
open fun blocksByHeightAsCont(upstreamFilter: Selector.UpstreamFilter): Reader<Long, Result<BlockContainer>> {
val numToRequest = Function<Long, Request<Long>> { num -> Request(num, upstreamFilter) }
return CompoundReader(
SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHeight()), tracer, CACHE_BLOCK_BY_HEIGHT_READER),
SpannedReader(RekeyingReader(numToRequest, directReader.blockByHeightReader), tracer, DIRECT_QUORUM_RPC_READER),
Expand All @@ -84,8 +84,8 @@ open class EthereumCachingReader(
return directReader.logsByHashReader
}

open fun txByHashAsCont(matcher: Selector.Matcher): Reader<TxId, Result<TxContainer>> {
val idToTxHash = Function<TxId, Request<TransactionId>> { id -> Request(TransactionId.from(id.value), matcher) }
open fun txByHashAsCont(upstreamFilter: Selector.UpstreamFilter): Reader<TxId, Result<TxContainer>> {
val idToTxHash = Function<TxId, Request<TransactionId>> { id -> Request(TransactionId.from(id.value), upstreamFilter) }
return CompoundReader(
CacheWithUpstreamIdReader(SpannedReader(caches.getTxByHash(), tracer, CACHE_TX_BY_HASH_READER)),
SpannedReader(RekeyingReader(idToTxHash, directReader.txReader), tracer, DIRECT_QUORUM_RPC_READER),
Expand All @@ -100,9 +100,9 @@ open class EthereumCachingReader(
)
}

fun receipts(matcher: Selector.Matcher): Reader<TxId, Result<ByteArray>> {
fun receipts(upstreamFilter: Selector.UpstreamFilter): Reader<TxId, Result<ByteArray>> {
val requested = RekeyingReader(
{ txid: TxId -> Request(TransactionId.from(txid.value), matcher) },
{ txid: TxId -> Request(TransactionId.from(txid.value), upstreamFilter) },
directReader.receiptReader,
)
return CompoundReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,19 @@ class EthereumDirectReader(
blockReader = object : Reader<Request<BlockHash>, Result<BlockContainer>> {
override fun read(key: Request<BlockHash>): Mono<Result<BlockContainer>> {
val request = ChainRequest("eth_getBlockByHash", ListParams(key.requestBy.toHex(), false))
return readBlock(request, key.requestBy.toHex(), key.matcher)
return readBlock(request, key.requestBy.toHex(), key.upstreamFilter.matcher, key.upstreamFilter.sort)
}
}
blockByHeightReader = object : Reader<Request<Long>, Result<BlockContainer>> {
override fun read(key: Request<Long>): Mono<Result<BlockContainer>> {
val request = ChainRequest("eth_getBlockByNumber", ListParams(HexQuantity.from(key.requestBy).toHex(), false))
return readBlock(request, key.toString(), key.matcher)
return readBlock(request, key.toString(), key.upstreamFilter.matcher, key.upstreamFilter.sort)
}
}
txReader = object : Reader<Request<TransactionId>, Result<TxContainer>> {
override fun read(key: Request<TransactionId>): Mono<Result<TxContainer>> {
val request = ChainRequest("eth_getTransactionByHash", ListParams(key.requestBy.toHex()))
return readWithQuorum(request, key.matcher) // retries were removed because we use NotNullQuorum which handle errors too
return readWithQuorum(request, key.upstreamFilter.matcher, key.upstreamFilter.sort) // retries were removed because we use NotNullQuorum which handle errors too
.timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Tx not read $key")))
.flatMap { result ->
val tx = objectMapper.readValue(result.data, TransactionJsonSnapshot::class.java)
Expand Down Expand Up @@ -152,7 +152,7 @@ class EthereumDirectReader(
receiptReader = object : Reader<Request<TransactionId>, Result<ByteArray>> {
override fun read(key: Request<TransactionId>): Mono<Result<ByteArray>> {
val request = ChainRequest("eth_getTransactionReceipt", ListParams(key.requestBy.toHex()))
return readWithQuorum(request, key.matcher)
return readWithQuorum(request, key.upstreamFilter.matcher, key.upstreamFilter.sort)
.timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Receipt not read $key")))
.flatMap { result ->
val receipt = objectMapper.readValue(result.data, TransactionReceiptJson::class.java)
Expand Down Expand Up @@ -274,6 +274,6 @@ class EthereumDirectReader(

data class Request<T>(
val requestBy: T,
val matcher: Selector.Matcher,
val upstreamFilter: Selector.UpstreamFilter,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class EthereumLocalReader(
} catch (e: IllegalArgumentException) {
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be transaction id")
}
reader.txByHashAsCont(key.matcher)
reader.txByHashAsCont(key.upstreamFilter)
.read(hash)
.map { ChainResponse(it.data.json, null, it.resolvedUpstreamData) }
}
Expand All @@ -107,14 +107,14 @@ class EthereumLocalReader(
if (withTx) {
null
} else {
reader.blocksByIdAsCont(key.matcher).read(hash).map {
reader.blocksByIdAsCont(key.upstreamFilter).read(hash).map {
ChainResponse(it.data.json, null, it.resolvedUpstreamData)
}
}
}

method == "eth_getBlockByNumber" -> {
getBlockByNumber(params.list, key.matcher)
getBlockByNumber(params.list, key.upstreamFilter)
}

method == "eth_getTransactionReceipt" -> {
Expand All @@ -127,7 +127,7 @@ class EthereumLocalReader(
} catch (e: IllegalArgumentException) {
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be transaction id")
}
reader.receipts(key.matcher)
reader.receipts(key.upstreamFilter)
.read(hash)
.map { ChainResponse(it.data, null, it.resolvedUpstreamData) }
}
Expand All @@ -142,7 +142,7 @@ class EthereumLocalReader(
return null
}

fun getBlockByNumber(params: List<Any?>, matcher: Selector.Matcher): Mono<ChainResponse>? {
fun getBlockByNumber(params: List<Any?>, upstreamFilter: Selector.UpstreamFilter): Mono<ChainResponse>? {
if (params.size != 2 || params[0] == null || params[1] == null) {
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "Must provide 2 parameters")
}
Expand Down Expand Up @@ -190,7 +190,7 @@ class EthereumLocalReader(
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be a block number")
}

return reader.blocksByHeightAsCont(matcher)
return reader.blocksByHeightAsCont(upstreamFilter)
.read(number).map { ChainResponse(it.data.json, null, it.resolvedUpstreamData) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
def act = reader.blockReader.read(new EthereumDirectReader.Request<BlockHash>(BlockHash.from(hash1), Selector.empty))
def act = reader.blockReader.read(new EthereumDirectReader.Request<BlockHash>(BlockHash.from(hash1), Selector.UpstreamFilter.default))
then:
StepVerifier.create(act)
.expectNextMatches { block ->
Expand Down Expand Up @@ -122,7 +122,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
def act = reader.blockReader.read(new EthereumDirectReader.Request<BlockHash>(BlockHash.from(hash1), Selector.empty))
def act = reader.blockReader.read(new EthereumDirectReader.Request<BlockHash>(BlockHash.from(hash1), Selector.UpstreamFilter.default))
then:
StepVerifier.create(act)
.expectComplete()
Expand Down Expand Up @@ -155,7 +155,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
def act = reader.blockByHeightReader.read(new EthereumDirectReader.Request<Long>(100L, Selector.empty))
def act = reader.blockByHeightReader.read(new EthereumDirectReader.Request<Long>(100L, Selector.UpstreamFilter.default))
then:
StepVerifier.create(act)
.expectNextMatches { block ->
Expand Down Expand Up @@ -220,7 +220,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
def act = reader.txReader.read(new EthereumDirectReader.Request<TransactionId>(TransactionId.from(hash1), Selector.empty))
def act = reader.txReader.read(new EthereumDirectReader.Request<TransactionId>(TransactionId.from(hash1), Selector.UpstreamFilter.default))
then:
StepVerifier.create(act)
.expectNextMatches { block ->
Expand Down Expand Up @@ -253,7 +253,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
def act = reader.receiptReader.read(new EthereumDirectReader.Request<TransactionId>(TransactionId.from(hash1), Selector.empty))
def act = reader.receiptReader.read(new EthereumDirectReader.Request<TransactionId>(TransactionId.from(hash1), Selector.UpstreamFilter.default))
.block(Duration.ofSeconds(1))
.with { new String(it.data) }
then:
Expand Down Expand Up @@ -287,7 +287,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
def act = reader.receiptReader.read(new EthereumDirectReader.Request<TransactionId>(TransactionId.from(hash1), Selector.empty))
def act = reader.receiptReader.read(new EthereumDirectReader.Request<TransactionId>(TransactionId.from(hash1), Selector.UpstreamFilter.default))
.block(Duration.ofSeconds(1))
.with { new String(it.data) }
then:
Expand All @@ -312,7 +312,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
def act = reader.txReader.read(new EthereumDirectReader.Request<TransactionId>(TransactionId.from(hash1), Selector.empty))
def act = reader.txReader.read(new EthereumDirectReader.Request<TransactionId>(TransactionId.from(hash1), Selector.UpstreamFilter.default))
then:
StepVerifier.create(act)
.expectComplete()
Expand Down Expand Up @@ -411,7 +411,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
def act = ethereumDirectReader.blockReader.read(new EthereumDirectReader.Request<BlockHash>(BlockHash.from(hash1), Selector.empty))
def act = ethereumDirectReader.blockReader.read(new EthereumDirectReader.Request<BlockHash>(BlockHash.from(hash1), Selector.UpstreamFilter.default))
then:
StepVerifier.create(act)
.expectNextMatches { block ->
Expand Down Expand Up @@ -451,7 +451,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
def act = ethereumDirectReader.blockByHeightReader.read(new EthereumDirectReader.Request<Long>(100L, Selector.empty))
def act = ethereumDirectReader.blockByHeightReader.read(new EthereumDirectReader.Request<Long>(100L, Selector.UpstreamFilter.default))
then:
StepVerifier.create(act)
.expectNextMatches { block ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class EthereumLocalReaderSpec extends Specification {
1 * getCurrentHeight() >> 101L
}
def reader = Mock(EthereumCachingReader) {
_ * blocksByIdAsCont(Selector.empty) >> new EmptyReader<>()
_ * txByHashAsCont(Selector.empty) >> new EmptyReader<>()
1 * blocksByHeightAsCont(Selector.empty) >> Mock(Reader) {
_ * blocksByIdAsCont(Selector.UpstreamFilter.default) >> new EmptyReader<>()
_ * txByHashAsCont(Selector.UpstreamFilter.default) >> new EmptyReader<>()
1 * blocksByHeightAsCont(Selector.UpstreamFilter.default) >> Mock(Reader) {
1 * read(101L) >> Mono.just(
new EthereumDirectReader.Result<>(TestingCommons.blockForEthereum(101L), List.of())
)
Expand All @@ -82,7 +82,7 @@ class EthereumLocalReaderSpec extends Specification {
def router = new EthereumLocalReader(reader, methods, head, null)

when:
def act = router.getBlockByNumber(["latest", false], Selector.empty)
def act = router.getBlockByNumber(["latest", false], Selector.UpstreamFilter.default)

then:
act != null
Expand All @@ -98,9 +98,9 @@ class EthereumLocalReaderSpec extends Specification {
setup:
def head = Stub(Head) {}
def reader = Mock(EthereumCachingReader) {
_ * blocksByIdAsCont(Selector.empty) >> new EmptyReader<>()
_ * txByHashAsCont(Selector.empty) >> new EmptyReader<>()
1 * blocksByHeightAsCont(Selector.empty) >> Mock(Reader) {
_ * blocksByIdAsCont(Selector.UpstreamFilter.default) >> new EmptyReader<>()
_ * txByHashAsCont(Selector.UpstreamFilter.default) >> new EmptyReader<>()
1 * blocksByHeightAsCont(Selector.UpstreamFilter.default) >> Mock(Reader) {
1 * read(0L) >> Mono.just(
new EthereumDirectReader.Result<>(TestingCommons.blockForEthereum(0L), List.of())
)
Expand All @@ -110,7 +110,7 @@ class EthereumLocalReaderSpec extends Specification {
def router = new EthereumLocalReader(reader, methods, head, null)

when:
def act = router.getBlockByNumber(["earliest", false], Selector.empty)
def act = router.getBlockByNumber(["earliest", false], Selector.UpstreamFilter.default)

then:
act != null
Expand All @@ -126,9 +126,9 @@ class EthereumLocalReaderSpec extends Specification {
setup:
def head = Stub(Head) {}
def reader = Mock(EthereumCachingReader) {
_ * blocksByIdAsCont(Selector.empty) >> new EmptyReader<>()
_ * txByHashAsCont(Selector.empty) >> new EmptyReader<>()
1 * blocksByHeightAsCont(Selector.empty) >> Mock(Reader) {
_ * blocksByIdAsCont(Selector.UpstreamFilter.default) >> new EmptyReader<>()
_ * txByHashAsCont(Selector.UpstreamFilter.default) >> new EmptyReader<>()
1 * blocksByHeightAsCont(Selector.UpstreamFilter.default) >> Mock(Reader) {
1 * read(74735L) >> Mono.just(
new EthereumDirectReader.Result<>(TestingCommons.blockForEthereum(74735L), List.of())
)
Expand All @@ -138,7 +138,7 @@ class EthereumLocalReaderSpec extends Specification {
def router = new EthereumLocalReader(reader, methods, head, null)

when:
def act = router.getBlockByNumber(["0x123ef", false], Selector.empty)
def act = router.getBlockByNumber(["0x123ef", false], Selector.UpstreamFilter.default)

then:
act != null
Expand Down Expand Up @@ -184,15 +184,15 @@ class EthereumLocalReaderSpec extends Specification {
setup:
def head = Mock(Head)
def reader = Mock(EthereumCachingReader) {
_ * blocksByIdAsCont(Selector.empty) >> new EmptyReader<>()
_ * txByHashAsCont(Selector.empty) >> new EmptyReader<>()
_ * blocksByHeightAsCont(Selector.empty) >> new EmptyReader<>()
_ * blocksByIdAsCont(Selector.UpstreamFilter.default) >> new EmptyReader<>()
_ * txByHashAsCont(Selector.UpstreamFilter.default) >> new EmptyReader<>()
_ * blocksByHeightAsCont(Selector.UpstreamFilter.default) >> new EmptyReader<>()
}
def methods = new DefaultEthereumMethods(Chain.ETHEREUM__MAINNET, false)
def router = new EthereumLocalReader(reader, methods, head, null)

when:
def act = router.getBlockByNumber(["0x0", true], Selector.empty)
def act = router.getBlockByNumber(["0x0", true], Selector.UpstreamFilter.default)

then:
act == null
Expand Down
Loading

0 comments on commit 243a11e

Please sign in to comment.