From 377595f7d16526bd7ee990fb0bf7f3a16327b5bf Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Wed, 3 Jul 2024 15:19:29 +0400 Subject: [PATCH] Fix subscription event update (#517) --- .../io/emeraldpay/dshackle/upstream/Multistream.kt | 2 +- .../dshackle/upstream/state/MultistreamState.kt | 5 +++-- .../dshackle/upstream/state/MultistreamStateTest.kt | 10 +++++++--- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 35718784b..70d276c30 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -220,7 +220,7 @@ abstract class Multistream( protected open fun onUpstreamsUpdated() { val upstreams = getAll() - state.updateState(upstreams, getSubscriptionTopics()) + state.updateState(upstreams, getEgressSubscription()) when { upstreams.size == 1 -> { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamState.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamState.kt index fe6c38c74..7c956b258 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamState.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamState.kt @@ -4,6 +4,7 @@ import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.DefaultUpstream +import io.emeraldpay.dshackle.upstream.EgressSubscription import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.calls.AggregatedCallMethods @@ -59,7 +60,7 @@ class MultistreamState( return status } - fun updateState(upstreams: List, subs: List) { + fun updateState(upstreams: List, egressSubscription: EgressSubscription) { val oldState = CurrentMultistreamState(this) val availableUpstreams = upstreams.filter { it.isAvailable() } @@ -68,7 +69,7 @@ class MultistreamState( updateQuorumLabels(availableUpstreams) updateUpstreamBounds(availableUpstreams) status = if (upstreams.isEmpty()) UpstreamAvailability.UNAVAILABLE else upstreams.minOf { it.getStatus() } - this.subs = subs + this.subs = egressSubscription.getAvailableTopics() stateEvents.emitNext( stateHandler.compareStates(oldState, CurrentMultistreamState(this)), diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateTest.kt index c6f738fd3..c8d91e126 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/state/MultistreamStateTest.kt @@ -4,6 +4,7 @@ import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.DefaultUpstream +import io.emeraldpay.dshackle.upstream.EgressSubscription import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.finalization.FinalizationData @@ -57,11 +58,14 @@ class MultistreamStateTest { listOf(LowerBoundData(1, LowerBoundType.STATE), LowerBoundData(1, LowerBoundType.BLOCK)), listOf(FinalizationData(990, FinalizationType.SAFE_BLOCK), FinalizationData(880, FinalizationType.FINALIZED_BLOCK)), ) + val egressSub = mock { + on { getAvailableTopics() } doReturn listOf("heads", "notHeads") + } val state = MultistreamState {} StepVerifier.create(state.stateEvents()) - .then { state.updateState(listOf(up1, up2, up3), listOf("heads", "notHeads")) } + .then { state.updateState(listOf(up1, up2, up3), egressSub) } .assertNext { assertThat(it).hasSize(7) assertThat(it.toList()) @@ -93,11 +97,11 @@ class MultistreamStateTest { ), ) } - .then { state.updateState(listOf(up1, up2, up3), listOf("heads", "notHeads")) } + .then { state.updateState(listOf(up1, up2, up3), egressSub) } .assertNext { assertThat(it).hasSize(0) } - .then { state.updateState(listOf(up1, up2, up3, up4), listOf("heads", "notHeads")) } + .then { state.updateState(listOf(up1, up2, up3, up4), egressSub) } .assertNext { assertThat(it).hasSize(4) assertThat(it.toList())