diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt index 2629a88d1..d2041cb5b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt @@ -138,12 +138,12 @@ open class EthereumLikeRpcUpstream( validatorSubscription = validator.start() .subscribe(this::setStatus) } -// livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ -// hasLiveSubscriptionHead.set(it) -// eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED)) -// }, { -// log.debug("Error while checking live subscription for ${getId()}", it) -// },) + livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ + hasLiveSubscriptionHead.set(it) + eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED)) + }, { + log.debug("Error while checking live subscription for ${getId()}", it) + },) } private fun updateLabels(label: Pair) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt index c3136c958..522a813fa 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt @@ -3,9 +3,12 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.ThrottledLogger import io.emeraldpay.dshackle.upstream.Head import org.slf4j.LoggerFactory +import org.springframework.scheduling.concurrent.CustomizableThreadFactory import reactor.core.publisher.Flux import reactor.core.scheduler.Scheduler +import reactor.core.scheduler.Schedulers import java.time.Duration +import java.util.concurrent.Executors class HeadLivenessValidator( private val head: Head, @@ -16,6 +19,8 @@ class HeadLivenessValidator( companion object { const val CHECKED_BLOCKS_UNTIL_LIVE = 3 private val log = LoggerFactory.getLogger(HeadLivenessValidator::class.java) + val schedulerTimeout = + Schedulers.fromExecutor(Executors.newScheduledThreadPool(4, CustomizableThreadFactory("timeout-liveness"))) } fun getFlux(): Flux { @@ -51,6 +56,7 @@ class HeadLivenessValidator( ThrottledLogger.log(log, "head liveness check broken with timeout in $upstreamId") } }, + schedulerTimeout, ).repeat().subscribeOn(scheduler) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 8ed42d0c3..b849206f7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -7,6 +7,8 @@ import io.emeraldpay.dshackle.config.UpstreamsConfig.Labels import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.startup.QuorumForLabels +import io.emeraldpay.dshackle.startup.UpstreamChangeEvent +import io.emeraldpay.dshackle.startup.UpstreamChangeEvent.ChangeType.UPDATED import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head @@ -81,12 +83,12 @@ class GenericUpstream( log.info("Configured for ${chain.chainName}") connector.start() -// livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ -// hasLiveSubscriptionHead.set(it) -// eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UPDATED)) -// }, { -// log.debug("Error while checking live subscription for ${getId()}", it) -// },) + livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ + hasLiveSubscriptionHead.set(it) + eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UPDATED)) + }, { + log.debug("Error while checking live subscription for ${getId()}", it) + },) } override fun stop() {