diff --git a/emerald-grpc b/emerald-grpc index 9397863a1..d8067885f 160000 --- a/emerald-grpc +++ b/emerald-grpc @@ -1 +1 @@ -Subproject commit 9397863a13a6fbe5e43bbf4aa3df59d7936b45e1 +Subproject commit d8067885f585346522e4f7f7f9754049e67918bd diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index fbf92100d..7f04ea32d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -342,6 +342,10 @@ abstract class Multistream( return lowerBounds.values } + override fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? { + return lowerBounds[lowerBoundType] + } + override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? { return Upstream.UpstreamSettingsData( nodeId(), diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt index cb81ad91b..e5d63271b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt @@ -27,6 +27,7 @@ import io.emeraldpay.dshackle.upstream.MatchesResponse.NotMatchedResponse import io.emeraldpay.dshackle.upstream.MatchesResponse.SameNodeResponse import io.emeraldpay.dshackle.upstream.MatchesResponse.SlotHeightResponse import io.emeraldpay.dshackle.upstream.MatchesResponse.Success +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import org.apache.commons.lang3.StringUtils import java.util.Collections @@ -72,13 +73,27 @@ class Selector { selectors.forEach { selector -> if (selector.hasHeightSelector() && selector.heightSelector.height == -1L) { return Sort(compareByDescending { it.getHead().getCurrentHeight() }) - } else if (selector.hasLowerHeightSelector() && selector.lowerHeightSelector.height == 0L) { - return Sort(compareBy(nullsLast()) { it.getHead().getCurrentHeight() }) + } else if (selector.hasLowerHeightSelector()) { + return Sort( + compareBy(nullsLast()) { + it.getLowerBound(fromProtoType(selector.lowerHeightSelector.lowerBoundType))?.lowerBound + }, + ) } } return Sort.default } + private fun fromProtoType(type: BlockchainOuterClass.LowerBoundType): LowerBoundType { + return when (type) { + BlockchainOuterClass.LowerBoundType.LOWER_BOUND_SLOT -> LowerBoundType.SLOT + BlockchainOuterClass.LowerBoundType.LOWER_BOUND_UNSPECIFIED -> LowerBoundType.UNKNOWN + BlockchainOuterClass.LowerBoundType.LOWER_BOUND_STATE -> LowerBoundType.STATE + BlockchainOuterClass.LowerBoundType.LOWER_BOUND_BLOCK -> LowerBoundType.BLOCK + BlockchainOuterClass.LowerBoundType.UNRECOGNIZED -> LowerBoundType.UNKNOWN + } + } + @JvmStatic fun convertToMatcher(req: BlockchainOuterClass.Selector?): LabelSelectorMatcher { return when { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt index 9276f796b..b00daadfb 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt @@ -47,6 +47,7 @@ interface Upstream : Lifecycle { fun getCapabilities(): Set fun isGrpc(): Boolean fun getLowerBounds(): Collection + fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? fun getUpstreamSettingsData(): UpstreamSettingsData? fun updateLowerBound(lowerBound: Long, type: LowerBoundType) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt index 9c1e1819b..0d46ed8fe 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt @@ -29,6 +29,7 @@ import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import reactor.core.Disposable open class BitcoinRpcUpstream( @@ -76,6 +77,10 @@ open class BitcoinRpcUpstream( return emptyList() } + override fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? { + return null + } + override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? { return null } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 59dd137fa..f65d96c5e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -95,6 +95,10 @@ open class GenericUpstream( return lowerBoundService.getLowerBounds() } + override fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? { + return lowerBoundService.getLowerBound(lowerBoundType) + } + override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? { return Upstream.UpstreamSettingsData( nodeId(), diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt index cf1c9b804..c6e0139cf 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt @@ -38,6 +38,7 @@ import io.emeraldpay.dshackle.upstream.bitcoin.ExtractBlock import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException import io.emeraldpay.dshackle.upstream.forkchoice.MostWorkForkChoice import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import org.reactivestreams.Publisher @@ -154,6 +155,10 @@ class BitcoinGrpcUpstream( return emptyList() } + override fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? { + return null + } + override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? { return null } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt index cee83d3c3..c1a2331e4 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt @@ -36,6 +36,7 @@ import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.ethereum.domain.BlockHash import io.emeraldpay.dshackle.upstream.forkchoice.NoChoiceWithPriorityForkChoice import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient import reactor.core.publisher.Flux import reactor.core.scheduler.Scheduler @@ -183,6 +184,10 @@ open class GenericGrpcUpstream( return emptyList() } + override fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? { + return null + } + override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? { return null } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundService.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundService.kt index 2b19653ca..d23f81af3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundService.kt @@ -35,5 +35,7 @@ abstract class LowerBoundService( fun getLowerBounds(): Collection = lowerBounds.values + fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? = lowerBounds[lowerBoundType] + protected abstract fun detectors(): List } diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy index 2ae39c269..30a51becc 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy @@ -112,4 +112,9 @@ class GenericUpstreamMock extends GenericUpstream { Collection getLowerBounds() { return List.of(new LowerBoundData(0, LowerBoundType.STATE)) } + + @Override + LowerBoundData getLowerBound(@NotNull LowerBoundType lowerBoundType) { + return new LowerBoundData(0, LowerBoundType.STATE) + } } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/SelectorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/SelectorTest.kt new file mode 100644 index 000000000..09887da99 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/SelectorTest.kt @@ -0,0 +1,131 @@ +package io.emeraldpay.dshackle.upstream + +import io.emeraldpay.api.proto.BlockchainOuterClass +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.Arguments.of +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock + +class SelectorTest { + + @ParameterizedTest + @MethodSource("data") + fun `sort with lower height matcher`( + lowerBoundType: LowerBoundType, + protoLowerBoundType: BlockchainOuterClass.LowerBoundType, + ) { + val up1 = mock { + on { getLowerBound(lowerBoundType) } doReturn LowerBoundData(1, lowerBoundType) + } + val up2 = mock { + on { getLowerBound(lowerBoundType) } doReturn LowerBoundData(1000, lowerBoundType) + } + val up3 = mock { + on { getLowerBound(lowerBoundType) } doReturn LowerBoundData(100000, lowerBoundType) + } + val up4 = mock { + on { getLowerBound(lowerBoundType) } doReturn null + } + val ups = listOf(up4, up3, up2, up1) + val requestSelectors = listOf( + BlockchainOuterClass.Selector.newBuilder() + .setLowerHeightSelector( + BlockchainOuterClass.LowerHeightSelector.newBuilder() + .setLowerBoundType(protoLowerBoundType) + .build(), + ) + .build(), + ) + + val upstreamFilter = Selector.convertToUpstreamFilter(requestSelectors) + + val actual = ups.sortedWith(upstreamFilter.sort.comparator) + + assertEquals( + listOf(up1, up2, up3, up4), + actual, + ) + } + + @Test + fun `preserve the same order if no lower bound type`() { + val up1 = mock { + on { getLowerBound(LowerBoundType.STATE) } doReturn LowerBoundData(1, LowerBoundType.STATE) + } + val up2 = mock { + on { getLowerBound(LowerBoundType.STATE) } doReturn LowerBoundData(1000, LowerBoundType.STATE) + } + val up3 = mock { + on { getLowerBound(LowerBoundType.STATE) } doReturn LowerBoundData(100000, LowerBoundType.STATE) + } + val up4 = mock { + on { getLowerBound(LowerBoundType.STATE) } doReturn null + } + val ups = listOf(up4, up3, up2, up1) + val requestSelectors = listOf( + BlockchainOuterClass.Selector.newBuilder() + .setLowerHeightSelector( + BlockchainOuterClass.LowerHeightSelector.newBuilder() + .build(), + ) + .build(), + ) + + val upstreamFilter = Selector.convertToUpstreamFilter(requestSelectors) + + val actual = ups.sortedWith(upstreamFilter.sort.comparator) + + assertEquals( + listOf(up4, up3, up2, up1), + actual, + ) + } + + @Test + fun `ignore upstream with no needed lower bound`() { + val up1 = mock { + on { getLowerBound(LowerBoundType.STATE) } doReturn LowerBoundData(1, LowerBoundType.STATE) + } + val up2 = mock { + on { getLowerBound(LowerBoundType.BLOCK) } doReturn LowerBoundData(1000, LowerBoundType.BLOCK) + } + val up3 = mock { + on { getLowerBound(LowerBoundType.BLOCK) } doReturn LowerBoundData(100000, LowerBoundType.BLOCK) + } + val ups = listOf(up1, up3, up2) + val requestSelectors = listOf( + BlockchainOuterClass.Selector.newBuilder() + .setLowerHeightSelector( + BlockchainOuterClass.LowerHeightSelector.newBuilder() + .setLowerBoundType(BlockchainOuterClass.LowerBoundType.LOWER_BOUND_BLOCK) + .build(), + ) + .build(), + ) + + val upstreamFilter = Selector.convertToUpstreamFilter(requestSelectors) + + val actual = ups.sortedWith(upstreamFilter.sort.comparator) + + assertEquals( + listOf(up2, up3, up1), + actual, + ) + } + + companion object { + @JvmStatic + fun data(): List = + listOf( + of(LowerBoundType.STATE, BlockchainOuterClass.LowerBoundType.LOWER_BOUND_STATE), + of(LowerBoundType.BLOCK, BlockchainOuterClass.LowerBoundType.LOWER_BOUND_BLOCK), + of(LowerBoundType.SLOT, BlockchainOuterClass.LowerBoundType.LOWER_BOUND_SLOT), + ) + } +}