Skip to content

Commit

Permalink
Fix lower matcher - sort by lower bound type (#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored May 29, 2024
1 parent 5e35a88 commit f3ab6b3
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 3 deletions.
2 changes: 1 addition & 1 deletion emerald-grpc
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ abstract class Multistream(
return lowerBounds.values
}

override fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? {
return lowerBounds[lowerBoundType]
}

override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? {
return Upstream.UpstreamSettingsData(
nodeId(),
Expand Down
19 changes: 17 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.emeraldpay.dshackle.upstream.MatchesResponse.NotMatchedResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.SameNodeResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.SlotHeightResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.Success
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import org.apache.commons.lang3.StringUtils
import java.util.Collections

Expand Down Expand Up @@ -72,13 +73,27 @@ class Selector {
selectors.forEach { selector ->
if (selector.hasHeightSelector() && selector.heightSelector.height == -1L) {
return Sort(compareByDescending { it.getHead().getCurrentHeight() })
} else if (selector.hasLowerHeightSelector() && selector.lowerHeightSelector.height == 0L) {
return Sort(compareBy(nullsLast()) { it.getHead().getCurrentHeight() })
} else if (selector.hasLowerHeightSelector()) {
return Sort(
compareBy(nullsLast()) {
it.getLowerBound(fromProtoType(selector.lowerHeightSelector.lowerBoundType))?.lowerBound
},
)
}
}
return Sort.default
}

private fun fromProtoType(type: BlockchainOuterClass.LowerBoundType): LowerBoundType {
return when (type) {
BlockchainOuterClass.LowerBoundType.LOWER_BOUND_SLOT -> LowerBoundType.SLOT
BlockchainOuterClass.LowerBoundType.LOWER_BOUND_UNSPECIFIED -> LowerBoundType.UNKNOWN
BlockchainOuterClass.LowerBoundType.LOWER_BOUND_STATE -> LowerBoundType.STATE
BlockchainOuterClass.LowerBoundType.LOWER_BOUND_BLOCK -> LowerBoundType.BLOCK
BlockchainOuterClass.LowerBoundType.UNRECOGNIZED -> LowerBoundType.UNKNOWN
}
}

@JvmStatic
fun convertToMatcher(req: BlockchainOuterClass.Selector?): LabelSelectorMatcher {
return when {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ interface Upstream : Lifecycle {
fun getCapabilities(): Set<Capability>
fun isGrpc(): Boolean
fun getLowerBounds(): Collection<LowerBoundData>
fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData?
fun getUpstreamSettingsData(): UpstreamSettingsData?
fun updateLowerBound(lowerBound: Long, type: LowerBoundType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import reactor.core.Disposable

open class BitcoinRpcUpstream(
Expand Down Expand Up @@ -76,6 +77,10 @@ open class BitcoinRpcUpstream(
return emptyList()
}

override fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? {
return null
}

override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? {
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ open class GenericUpstream(
return lowerBoundService.getLowerBounds()
}

override fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? {
return lowerBoundService.getLowerBound(lowerBoundType)
}

override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? {
return Upstream.UpstreamSettingsData(
nodeId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import io.emeraldpay.dshackle.upstream.bitcoin.ExtractBlock
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
import io.emeraldpay.dshackle.upstream.forkchoice.MostWorkForkChoice
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import org.reactivestreams.Publisher
Expand Down Expand Up @@ -154,6 +155,10 @@ class BitcoinGrpcUpstream(
return emptyList()
}

override fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? {
return null
}

override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? {
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.domain.BlockHash
import io.emeraldpay.dshackle.upstream.forkchoice.NoChoiceWithPriorityForkChoice
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient
import reactor.core.publisher.Flux
import reactor.core.scheduler.Scheduler
Expand Down Expand Up @@ -183,6 +184,10 @@ open class GenericGrpcUpstream(
return emptyList()
}

override fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? {
return null
}

override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? {
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,7 @@ abstract class LowerBoundService(

fun getLowerBounds(): Collection<LowerBoundData> = lowerBounds.values

fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? = lowerBounds[lowerBoundType]

protected abstract fun detectors(): List<LowerBoundDetector>
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,9 @@ class GenericUpstreamMock extends GenericUpstream {
Collection<LowerBoundData> getLowerBounds() {
return List.of(new LowerBoundData(0, LowerBoundType.STATE))
}

@Override
LowerBoundData getLowerBound(@NotNull LowerBoundType lowerBoundType) {
return new LowerBoundData(0, LowerBoundType.STATE)
}
}
131 changes: 131 additions & 0 deletions src/test/kotlin/io/emeraldpay/dshackle/upstream/SelectorTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.Arguments.of
import org.junit.jupiter.params.provider.MethodSource
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock

class SelectorTest {

@ParameterizedTest
@MethodSource("data")
fun `sort with lower height matcher`(
lowerBoundType: LowerBoundType,
protoLowerBoundType: BlockchainOuterClass.LowerBoundType,
) {
val up1 = mock<Upstream> {
on { getLowerBound(lowerBoundType) } doReturn LowerBoundData(1, lowerBoundType)
}
val up2 = mock<Upstream> {
on { getLowerBound(lowerBoundType) } doReturn LowerBoundData(1000, lowerBoundType)
}
val up3 = mock<Upstream> {
on { getLowerBound(lowerBoundType) } doReturn LowerBoundData(100000, lowerBoundType)
}
val up4 = mock<Upstream> {
on { getLowerBound(lowerBoundType) } doReturn null
}
val ups = listOf(up4, up3, up2, up1)
val requestSelectors = listOf(
BlockchainOuterClass.Selector.newBuilder()
.setLowerHeightSelector(
BlockchainOuterClass.LowerHeightSelector.newBuilder()
.setLowerBoundType(protoLowerBoundType)
.build(),
)
.build(),
)

val upstreamFilter = Selector.convertToUpstreamFilter(requestSelectors)

val actual = ups.sortedWith(upstreamFilter.sort.comparator)

assertEquals(
listOf(up1, up2, up3, up4),
actual,
)
}

@Test
fun `preserve the same order if no lower bound type`() {
val up1 = mock<Upstream> {
on { getLowerBound(LowerBoundType.STATE) } doReturn LowerBoundData(1, LowerBoundType.STATE)
}
val up2 = mock<Upstream> {
on { getLowerBound(LowerBoundType.STATE) } doReturn LowerBoundData(1000, LowerBoundType.STATE)
}
val up3 = mock<Upstream> {
on { getLowerBound(LowerBoundType.STATE) } doReturn LowerBoundData(100000, LowerBoundType.STATE)
}
val up4 = mock<Upstream> {
on { getLowerBound(LowerBoundType.STATE) } doReturn null
}
val ups = listOf(up4, up3, up2, up1)
val requestSelectors = listOf(
BlockchainOuterClass.Selector.newBuilder()
.setLowerHeightSelector(
BlockchainOuterClass.LowerHeightSelector.newBuilder()
.build(),
)
.build(),
)

val upstreamFilter = Selector.convertToUpstreamFilter(requestSelectors)

val actual = ups.sortedWith(upstreamFilter.sort.comparator)

assertEquals(
listOf(up4, up3, up2, up1),
actual,
)
}

@Test
fun `ignore upstream with no needed lower bound`() {
val up1 = mock<Upstream> {
on { getLowerBound(LowerBoundType.STATE) } doReturn LowerBoundData(1, LowerBoundType.STATE)
}
val up2 = mock<Upstream> {
on { getLowerBound(LowerBoundType.BLOCK) } doReturn LowerBoundData(1000, LowerBoundType.BLOCK)
}
val up3 = mock<Upstream> {
on { getLowerBound(LowerBoundType.BLOCK) } doReturn LowerBoundData(100000, LowerBoundType.BLOCK)
}
val ups = listOf(up1, up3, up2)
val requestSelectors = listOf(
BlockchainOuterClass.Selector.newBuilder()
.setLowerHeightSelector(
BlockchainOuterClass.LowerHeightSelector.newBuilder()
.setLowerBoundType(BlockchainOuterClass.LowerBoundType.LOWER_BOUND_BLOCK)
.build(),
)
.build(),
)

val upstreamFilter = Selector.convertToUpstreamFilter(requestSelectors)

val actual = ups.sortedWith(upstreamFilter.sort.comparator)

assertEquals(
listOf(up2, up3, up1),
actual,
)
}

companion object {
@JvmStatic
fun data(): List<Arguments> =
listOf(
of(LowerBoundType.STATE, BlockchainOuterClass.LowerBoundType.LOWER_BOUND_STATE),
of(LowerBoundType.BLOCK, BlockchainOuterClass.LowerBoundType.LOWER_BOUND_BLOCK),
of(LowerBoundType.SLOT, BlockchainOuterClass.LowerBoundType.LOWER_BOUND_SLOT),
)
}
}

0 comments on commit f3ab6b3

Please sign in to comment.