Skip to content

Commit

Permalink
Fix status during launch, prevent recreating lagObservers (#299)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Sep 18, 2023
1 parent bfdbce2 commit 2ac566b
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.springframework.cloud.sleuth.Tracer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import reactor.core.scheduler.Scheduler
import java.util.concurrent.CopyOnWriteArrayList

@Configuration
open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory) {
Expand Down Expand Up @@ -46,7 +47,7 @@ open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory)

return EthereumMultistream(
chain,
ArrayList(),
CopyOnWriteArrayList(),
cachesFactory.getCaches(chain),
headScheduler,
tracer
Expand All @@ -63,7 +64,7 @@ open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory)

return EthereumPosMultiStream(
chain,
ArrayList(),
CopyOnWriteArrayList(),
cachesFactory.getCaches(chain),
headScheduler,
tracer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ open class ConfiguredUpstreams(
eventPublisher
)
upstream.start()
if (!upstream.isRunning) return null
if (!upstream.isRunning) {
log.debug("Upstream ${upstream.getId()} is not running, it can't be added")
return null
}
return upstream
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ class UpstreamChangeEvent(
upstream.setCaches(caches)
}
}

override fun toString(): String {
return "UpstreamChangeEvent(chain=$chain, upstream=${upstream.getId()}, type=$type)"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,9 @@ abstract class DefaultUpstream(
}

private fun statusByLag(lag: Long?, proposed: UpstreamAvailability): UpstreamAvailability {
if (lag == null) {
return UpstreamAvailability.UNAVAILABLE
}
return if (proposed == UpstreamAvailability.OK) {
when {
lag == null -> proposed
lag > chainConfig.syncingLagSize -> UpstreamAvailability.SYNCING
lag > chainConfig.laggingLagSize -> UpstreamAvailability.LAGGING
else -> proposed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,14 @@ abstract class HeadLagObserver(
.sample(throttling)
.flatMap(this::probeFollowers)
.map { item ->
log.debug("Set lag ${item.t1} to upstream ${item.t2.getId()}")
item.t2.setLag(item.t1)
}
}

fun probeFollowers(top: BlockContainer): Flux<Tuple2<Long, Upstream>> {
log.debug("Compute lag for ${followers.map { it.getId() }}")

return Flux.fromIterable(followers)
.parallel(followers.size)
.flatMap { up -> mapLagging(top, up, getCurrentBlocks(up)).subscribeOn(lagObserverScheduler) }
Expand All @@ -87,6 +90,9 @@ abstract class HeadLagObserver(
.doOnError { t ->
log.warn("Failed to find distance for $up", t)
}
.doOnNext {
log.debug("Lag for ${it.t2.getId()} is ${it.t1}")
}
}

open fun extractDistance(top: BlockContainer, curr: BlockContainer): Long {
Expand Down
11 changes: 7 additions & 4 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,13 @@ abstract class Multistream(
}
}
}
lagObserver?.stop()
lagObserver = null
when {
upstreams.size == 1 -> upstreams[0].setLag(0)
upstreams.size > 1 -> lagObserver = makeLagObserver()
upstreams.size == 1 -> {
lagObserver?.stop()
lagObserver = null
upstreams[0].setLag(0)
}
upstreams.size > 1 -> if (lagObserver == null) lagObserver = makeLagObserver()
}
}
}
Expand Down Expand Up @@ -387,6 +389,7 @@ abstract class Multistream(
val chain = event.chain
if (this.chain == chain) {
eventLock.withLock {
log.debug("Processing event $event")
when (event.type) {
UpstreamChangeEvent.ChangeType.REVALIDATED -> {}
UpstreamChangeEvent.ChangeType.UPDATED -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ open class EthereumUpstreamValidator @JvmOverloads constructor(
)
.map(::resolve)
.defaultIfEmpty(UpstreamAvailability.UNAVAILABLE)
.onErrorReturn(UpstreamAvailability.UNAVAILABLE)
.onErrorResume {
log.error("Error during upstream validation for ${upstream.getId()}", it)
Mono.just(UpstreamAvailability.UNAVAILABLE)
}
}

fun resolve(results: Tuple2<UpstreamAvailability, UpstreamAvailability>): UpstreamAvailability {
Expand Down Expand Up @@ -132,6 +135,9 @@ open class EthereumUpstreamValidator @JvmOverloads constructor(
.flatMap {
validate()
}
.doOnNext {
log.debug("Status after validation is $it for ${upstream.getId()}")
}
}

fun validateUpstreamSettings(): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ open class EthereumPosMultiStream(
}

override fun makeLagObserver(): HeadLagObserver =
EthereumPosHeadLagObserver(head, ArrayList(upstreams), headScheduler).apply {
EthereumPosHeadLagObserver(head, upstreams, headScheduler).apply {
start()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,29 @@ class MultistreamSpec extends Specification {
.verify(Duration.ofSeconds(3))
}

def "After removing upstreams lag observer is stopped"() {
setup:
def up1 = TestingCommons.upstream("test-1", "internal")
def up2 = TestingCommons.upstream("test-2", "external")
def up3 = TestingCommons.upstream("test-3", "external")
def multistream = new EthereumPosMultiStream(Chain.ETHEREUM__MAINNET, [up1, up2, up3], Caches.default(), Schedulers.boundedElastic(), TestingCommons.tracerMock())
def observer = multistream.lagObserver
multistream.onUpstreamsUpdated()

expect:
multistream.getAll().size() == 3
observer.isRunning()

multistream.getAll().with {
remove(0)
remove(1)
}
multistream.getAll().size() == 1
multistream.onUpstreamsUpdated()
!observer.isRunning()
multistream.lagObserver == null
}

private BlockchainOuterClass.ChainStatus status(Common.AvailabilityEnum status) {
return BlockchainOuterClass.ChainStatus.newBuilder()
.setAvailability(status)
Expand Down

0 comments on commit 2ac566b

Please sign in to comment.