diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/context/MultistreamsConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/context/MultistreamsConfig.kt index fdf94a893..705bebf2b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/context/MultistreamsConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/context/MultistreamsConfig.kt @@ -14,6 +14,7 @@ import org.springframework.cloud.sleuth.Tracer import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import reactor.core.scheduler.Scheduler +import java.util.concurrent.CopyOnWriteArrayList @Configuration open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory) { @@ -46,7 +47,7 @@ open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory) return EthereumMultistream( chain, - ArrayList(), + CopyOnWriteArrayList(), cachesFactory.getCaches(chain), headScheduler, tracer @@ -63,7 +64,7 @@ open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory) return EthereumPosMultiStream( chain, - ArrayList(), + CopyOnWriteArrayList(), cachesFactory.getCaches(chain), headScheduler, tracer diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index 153ed7cc5..e4aea9284 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -245,7 +245,10 @@ open class ConfiguredUpstreams( eventPublisher ) upstream.start() - if (!upstream.isRunning) return null + if (!upstream.isRunning) { + log.debug("Upstream ${upstream.getId()} is not running, it can't be added") + return null + } return upstream } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/UpstreamChangeEvent.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/UpstreamChangeEvent.kt index 82bd9b5cb..728e76b89 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/UpstreamChangeEvent.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/UpstreamChangeEvent.kt @@ -66,4 +66,8 @@ class UpstreamChangeEvent( upstream.setCaches(caches) } } + + override fun toString(): String { + return "UpstreamChangeEvent(chain=$chain, upstream=${upstream.getId()}, type=$type)" + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index c57eaa6fc..2689db613 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -89,11 +89,9 @@ abstract class DefaultUpstream( } private fun statusByLag(lag: Long?, proposed: UpstreamAvailability): UpstreamAvailability { - if (lag == null) { - return UpstreamAvailability.UNAVAILABLE - } return if (proposed == UpstreamAvailability.OK) { when { + lag == null -> proposed lag > chainConfig.syncingLagSize -> UpstreamAvailability.SYNCING lag > chainConfig.laggingLagSize -> UpstreamAvailability.LAGGING else -> proposed diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/HeadLagObserver.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/HeadLagObserver.kt index bc2224db3..e5aebb7ce 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/HeadLagObserver.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/HeadLagObserver.kt @@ -62,11 +62,14 @@ abstract class HeadLagObserver( .sample(throttling) .flatMap(this::probeFollowers) .map { item -> + log.debug("Set lag ${item.t1} to upstream ${item.t2.getId()}") item.t2.setLag(item.t1) } } fun probeFollowers(top: BlockContainer): Flux> { + log.debug("Compute lag for ${followers.map { it.getId() }}") + return Flux.fromIterable(followers) .parallel(followers.size) .flatMap { up -> mapLagging(top, up, getCurrentBlocks(up)).subscribeOn(lagObserverScheduler) } @@ -87,6 +90,9 @@ abstract class HeadLagObserver( .doOnError { t -> log.warn("Failed to find distance for $up", t) } + .doOnNext { + log.debug("Lag for ${it.t2.getId()} is ${it.t1}") + } } open fun extractDistance(top: BlockContainer, curr: BlockContainer): Long { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 623782a25..485825632 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -219,11 +219,13 @@ abstract class Multistream( } } } - lagObserver?.stop() - lagObserver = null when { - upstreams.size == 1 -> upstreams[0].setLag(0) - upstreams.size > 1 -> lagObserver = makeLagObserver() + upstreams.size == 1 -> { + lagObserver?.stop() + lagObserver = null + upstreams[0].setLag(0) + } + upstreams.size > 1 -> if (lagObserver == null) lagObserver = makeLagObserver() } } } @@ -387,6 +389,7 @@ abstract class Multistream( val chain = event.chain if (this.chain == chain) { eventLock.withLock { + log.debug("Processing event $event") when (event.type) { UpstreamChangeEvent.ChangeType.REVALIDATED -> {} UpstreamChangeEvent.ChangeType.UPDATED -> { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt index f87957717..f2bfbbda9 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt @@ -64,7 +64,10 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( ) .map(::resolve) .defaultIfEmpty(UpstreamAvailability.UNAVAILABLE) - .onErrorReturn(UpstreamAvailability.UNAVAILABLE) + .onErrorResume { + log.error("Error during upstream validation for ${upstream.getId()}", it) + Mono.just(UpstreamAvailability.UNAVAILABLE) + } } fun resolve(results: Tuple2): UpstreamAvailability { @@ -132,6 +135,9 @@ open class EthereumUpstreamValidator @JvmOverloads constructor( .flatMap { validate() } + .doOnNext { + log.debug("Status after validation is $it for ${upstream.getId()}") + } } fun validateUpstreamSettings(): Boolean { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum_pos/EthereumPosMultiStream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum_pos/EthereumPosMultiStream.kt index cd0fb23cb..c199b913b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum_pos/EthereumPosMultiStream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum_pos/EthereumPosMultiStream.kt @@ -107,7 +107,7 @@ open class EthereumPosMultiStream( } override fun makeLagObserver(): HeadLagObserver = - EthereumPosHeadLagObserver(head, ArrayList(upstreams), headScheduler).apply { + EthereumPosHeadLagObserver(head, upstreams, headScheduler).apply { start() } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy index 92ee5b3c2..a51ebe9be 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy @@ -306,6 +306,29 @@ class MultistreamSpec extends Specification { .verify(Duration.ofSeconds(3)) } + def "After removing upstreams lag observer is stopped"() { + setup: + def up1 = TestingCommons.upstream("test-1", "internal") + def up2 = TestingCommons.upstream("test-2", "external") + def up3 = TestingCommons.upstream("test-3", "external") + def multistream = new EthereumPosMultiStream(Chain.ETHEREUM__MAINNET, [up1, up2, up3], Caches.default(), Schedulers.boundedElastic(), TestingCommons.tracerMock()) + def observer = multistream.lagObserver + multistream.onUpstreamsUpdated() + + expect: + multistream.getAll().size() == 3 + observer.isRunning() + + multistream.getAll().with { + remove(0) + remove(1) + } + multistream.getAll().size() == 1 + multistream.onUpstreamsUpdated() + !observer.isRunning() + multistream.lagObserver == null + } + private BlockchainOuterClass.ChainStatus status(Common.AvailabilityEnum status) { return BlockchainOuterClass.ChainStatus.newBuilder() .setAvailability(status)