Skip to content

Commit

Permalink
Return liveness
Browse files Browse the repository at this point in the history
  • Loading branch information
Кирилл committed Oct 31, 2023
1 parent c9a40ae commit 5298e5a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ open class EthereumLikeRpcUpstream(
validatorSubscription = validator.start()
.subscribe(this::setStatus)
}
// livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({
// hasLiveSubscriptionHead.set(it)
// eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED))
// }, {
// log.debug("Error while checking live subscription for ${getId()}", it)
// },)
livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({
hasLiveSubscriptionHead.set(it)
eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED))
}, {
log.debug("Error while checking live subscription for ${getId()}", it)
},)
}

private fun updateLabels(label: Pair<String, String>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package io.emeraldpay.dshackle.upstream.ethereum
import io.emeraldpay.dshackle.ThrottledLogger
import io.emeraldpay.dshackle.upstream.Head
import org.slf4j.LoggerFactory
import org.springframework.scheduling.concurrent.CustomizableThreadFactory
import reactor.core.publisher.Flux
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.time.Duration
import java.util.concurrent.Executors

class HeadLivenessValidator(
private val head: Head,
Expand All @@ -16,6 +19,8 @@ class HeadLivenessValidator(
companion object {
const val CHECKED_BLOCKS_UNTIL_LIVE = 3
private val log = LoggerFactory.getLogger(HeadLivenessValidator::class.java)
val schedulerTimeout =
Schedulers.fromExecutor(Executors.newScheduledThreadPool(4, CustomizableThreadFactory("timeout-liveness")))
}

fun getFlux(): Flux<Boolean> {
Expand Down Expand Up @@ -51,6 +56,7 @@ class HeadLivenessValidator(
ThrottledLogger.log(log, "head liveness check broken with timeout in $upstreamId")
}
},
schedulerTimeout,
).repeat().subscribeOn(scheduler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import io.emeraldpay.dshackle.config.UpstreamsConfig.Labels
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent.ChangeType.UPDATED
import io.emeraldpay.dshackle.upstream.Capability
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Head
Expand Down Expand Up @@ -81,12 +83,12 @@ class GenericUpstream(
log.info("Configured for ${chain.chainName}")
connector.start()

// livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({
// hasLiveSubscriptionHead.set(it)
// eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UPDATED))
// }, {
// log.debug("Error while checking live subscription for ${getId()}", it)
// },)
livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({
hasLiveSubscriptionHead.set(it)
eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UPDATED))
}, {
log.debug("Error while checking live subscription for ${getId()}", it)
},)
}

override fun stop() {
Expand Down

0 comments on commit 5298e5a

Please sign in to comment.