From 93655a2da546cc7602ccb0cd697297274939dec2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB?= Date: Mon, 30 Oct 2023 15:56:37 +0400 Subject: [PATCH 1/7] Move ws connection to separate scheduler --- .../emeraldpay/dshackle/config/context/SchedulersConfig.kt | 5 +++++ .../io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt | 4 +++- .../dshackle/upstream/ethereum/WsConnectionImpl.kt | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt index ddda94f6c..709a13963 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt @@ -30,6 +30,11 @@ open class SchedulersConfig { return makeScheduler("head-scheduler", 4, monitoringConfig) } + @Bean + open fun wsScheduler(monitoringConfig: MonitoringConfig): Scheduler { + return makeScheduler("ws-scheduler", 4, monitoringConfig) + } + @Bean open fun wsConnectionResubscribeScheduler(monitoringConfig: MonitoringConfig): Scheduler { return makeScheduler("ws-connection-resubscribe-scheduler", 2, monitoringConfig) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index 2c15e3455..6901a1814 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -98,6 +98,8 @@ open class ConfiguredUpstreams( private val clientSpansInterceptor: ClientInterceptor?, @Qualifier("headScheduler") private val headScheduler: Scheduler, + @Qualifier("wsScheduler") + private val wsScheduler: Scheduler, private val authorizationConfig: AuthorizationConfig, private val grpcAuthContext: GrpcAuthContext, ) : ApplicationRunner { @@ -482,7 +484,7 @@ open class ConfiguredUpstreams( chain, endpoint.url, endpoint.origin ?: URI("http://localhost"), - headScheduler, + wsScheduler, ).apply { config = endpoint basicAuth = endpoint.basicAuth diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt index c5893e5de..0d9cfd0d7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt @@ -339,7 +339,7 @@ open class WsConnectionImpl( val status = subscriptionResponses.tryEmitNext(subscription) if (status.isFailure) { if (status == Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER) { - log.debug("No subscribers to WS response") + log.debug("No subscribers to WS response with id {} and result {}", msg.id, msg.value?.decodeToString()) } else { log.warn("Failed to proceed with a WS message: $status") } From c9aabe4bf03aaf208e4491d65c5ff4ecbde8791f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB?= Date: Mon, 30 Oct 2023 16:17:42 +0400 Subject: [PATCH 2/7] Fix test --- .../dshackle/startup/ConfiguredUpstreamsSpec.groovy | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy index c3b66f95c..98d908951 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy @@ -35,6 +35,7 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) @@ -69,6 +70,7 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) @@ -102,6 +104,7 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) @@ -130,6 +133,7 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) @@ -163,6 +167,7 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) From 1fbd581ca5ef90d7bbbca6fd1321bc84226b1ec3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB?= Date: Mon, 30 Oct 2023 19:31:49 +0400 Subject: [PATCH 3/7] Unsubscribe --- .../upstream/ethereum/EthereumWsHead.kt | 27 ++++++++++++------- .../upstream/ethereum/WsSubscriptions.kt | 6 +++++ .../upstream/ethereum/WsSubscriptionsImpl.kt | 13 +++++++-- 3 files changed, 35 insertions(+), 11 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt index 0b11eed40..10b1ddffb 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt @@ -25,7 +25,6 @@ import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Lifecycle -import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice import io.emeraldpay.dshackle.upstream.generic.ChainSpecific @@ -40,6 +39,7 @@ import reactor.core.publisher.Mono import reactor.core.publisher.Sinks import reactor.core.scheduler.Scheduler import java.time.Duration +import java.util.concurrent.atomic.AtomicReference class EthereumWsHead( forkChoice: ForkChoice, @@ -49,11 +49,12 @@ class EthereumWsHead( private val skipEnhance: Boolean, private val wsConnectionResubscribeScheduler: Scheduler, private val headScheduler: Scheduler, - private val upstream: DefaultUpstream, + upstream: DefaultUpstream, chainSpecific: ChainSpecific, ) : GenericHead(upstream.getId(), forkChoice, blockValidator, headScheduler, chainSpecific), Lifecycle { private var connectionId: String? = null + private var subscriptionId = AtomicReference("") private var subscribed = false private var connected = false private var isSyncing = false @@ -94,9 +95,6 @@ class EthereumWsHead( fun listenNewHeads(): Flux { return subscribe() - .transform { - Flux.concat(it.next().doOnNext { upstream.setStatus(UpstreamAvailability.OK) }, it) - } .map { val block = Global.objectMapper.readValue(it, BlockJson::class.java) as BlockJson if (!block.checkExtraData() && skipEnhance) { @@ -144,11 +142,10 @@ class EthereumWsHead( } } .timeout(Duration.ofSeconds(60), Mono.error(RuntimeException("No response from subscribe to newHeads"))) - .onErrorResume { - log.error("Error getting heads for $upstreamId - ${it.message}") - upstream.setStatus(UpstreamAvailability.UNAVAILABLE) + .onErrorResume { err -> + log.error("Error getting heads for $upstreamId - ${err.message}") subscribed = false - Mono.empty() + unsubscribe() } } @@ -158,11 +155,23 @@ class EthereumWsHead( noHeadUpdatesSink.tryEmitComplete() } + private fun unsubscribe(): Mono { + return wsSubscriptions.unsubscribe(subscriptionId.get()) + .flatMap { it.requireResult() } + .doOnNext { log.warn("{} has just unsubscribed from newHeads", upstreamId) } + .onErrorResume { + log.error("{} couldn't unsubscribe from newHeads", upstreamId, it) + Mono.empty() + } + .then(Mono.empty()) + } + private fun subscribe(): Flux { return try { wsSubscriptions.subscribe("newHeads") .also { connectionId = it.connectionId + subscriptionId = it.subId if (!connected) { connected = true } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt index 229855f17..8e38fb757 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt @@ -15,7 +15,10 @@ */ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.util.concurrent.atomic.AtomicReference /** * A JSON-RPC Subscription client. @@ -39,10 +42,13 @@ interface WsSubscriptions { */ fun subscribe(method: String): SubscribeData + fun unsubscribe(id: String): Mono + fun connectionInfoFlux(): Flux data class SubscribeData( val data: Flux, val connectionId: String, + val subId: AtomicReference, ) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt index c9dabe069..3f4aafcfa 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt @@ -17,6 +17,7 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -37,7 +38,7 @@ class WsSubscriptionsImpl( val subscriptionId = AtomicReference("") val conn = wsPool.getConnection() val messages = conn.getSubscribeResponses() - .filter { it.subscriptionId == subscriptionId.get() } + // .filter { it.subscriptionId == subscriptionId.get() } .filter { it.result != null } // should never happen .map { it.result!! } @@ -52,7 +53,15 @@ class WsSubscriptionsImpl( } } - return WsSubscriptions.SubscribeData(messageFlux, conn.connectionId()) + return WsSubscriptions.SubscribeData(messageFlux, conn.connectionId(), subscriptionId) + } + + override fun unsubscribe(id: String): Mono { + if (id.isEmpty()) { + return Mono.empty() + } + return wsPool.getConnection() + .callRpc(JsonRpcRequest("eth_unsubscribe", listOf(id), ids.incrementAndGet())) } override fun connectionInfoFlux(): Flux = From ffbd0d043a4c3d8dcb5394c4aa3056d2ef4aed58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB?= Date: Mon, 30 Oct 2023 20:59:54 +0400 Subject: [PATCH 4/7] Unsubscribe --- .../dshackle/upstream/ethereum/WsSubscriptionsImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt index 3f4aafcfa..98a1487e9 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt @@ -38,7 +38,7 @@ class WsSubscriptionsImpl( val subscriptionId = AtomicReference("") val conn = wsPool.getConnection() val messages = conn.getSubscribeResponses() - // .filter { it.subscriptionId == subscriptionId.get() } + .filter { it.subscriptionId == subscriptionId.get() } .filter { it.result != null } // should never happen .map { it.result!! } From 0e1cf8a3d48bf4792d1a79f6215ddc13a01534f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB?= Date: Tue, 31 Oct 2023 02:02:32 +0400 Subject: [PATCH 5/7] new scheduler --- .../emeraldpay/dshackle/config/context/SchedulersConfig.kt | 5 +++++ .../io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt | 3 +++ .../upstream/generic/connectors/GenericConnectorFactory.kt | 3 +++ .../upstream/generic/connectors/GenericRpcConnector.kt | 3 ++- .../upstream/generic/connectors/GenericWsConnector.kt | 3 ++- 5 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt index 709a13963..cc2a2c298 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt @@ -35,6 +35,11 @@ open class SchedulersConfig { return makeScheduler("ws-scheduler", 4, monitoringConfig) } + @Bean + open fun headLivenessScheduler(monitoringConfig: MonitoringConfig): Scheduler { + return makeScheduler("head-liveness-scheduler", 4, monitoringConfig) + } + @Bean open fun wsConnectionResubscribeScheduler(monitoringConfig: MonitoringConfig): Scheduler { return makeScheduler("ws-connection-resubscribe-scheduler", 2, monitoringConfig) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index 6901a1814..1a423165e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -100,6 +100,8 @@ open class ConfiguredUpstreams( private val headScheduler: Scheduler, @Qualifier("wsScheduler") private val wsScheduler: Scheduler, + @Qualifier("headLivenessScheduler") + private val headLivenessScheduler: Scheduler, private val authorizationConfig: AuthorizationConfig, private val grpcAuthContext: GrpcAuthContext, ) : ApplicationRunner { @@ -520,6 +522,7 @@ open class ConfiguredUpstreams( blockValidator, wsConnectionResubscribeScheduler, headScheduler, + headLivenessScheduler, chainsConf.expectedBlockTime, ) if (!connectorFactory.isValid()) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt index f52640ec5..042e9f8da 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt @@ -22,6 +22,7 @@ open class GenericConnectorFactory( private val blockValidator: BlockValidator, private val wsConnectionResubscribeScheduler: Scheduler, private val headScheduler: Scheduler, + private val headLivenessScheduler: Scheduler, private val expectedBlockTime: Duration, ) : ConnectorFactory { @@ -60,6 +61,7 @@ open class GenericConnectorFactory( skipEnhance, wsConnectionResubscribeScheduler, headScheduler, + headLivenessScheduler, expectedBlockTime, specific, ) @@ -77,6 +79,7 @@ open class GenericConnectorFactory( skipEnhance, wsConnectionResubscribeScheduler, headScheduler, + headLivenessScheduler, expectedBlockTime, specific, ) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt index 43ee5ea06..9f665dec7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt @@ -40,6 +40,7 @@ class GenericRpcConnector( skipEnhance: Boolean, wsConnectionResubscribeScheduler: Scheduler, headScheduler: Scheduler, + headLivenessScheduler: Scheduler, expectedBlockTime: Duration, chainSpecific: ChainSpecific, ) : GenericConnector, CachesEnabled { @@ -110,7 +111,7 @@ class GenericRpcConnector( ) } } - liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, id) + liveness = HeadLivenessValidator(head, expectedBlockTime, headLivenessScheduler, id) } override fun setCaches(caches: Caches) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt index 3bd33a2c1..6022074a0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt @@ -26,6 +26,7 @@ class GenericWsConnector( skipEnhance: Boolean, wsConnectionResubscribeScheduler: Scheduler, headScheduler: Scheduler, + headLivenessScheduler: Scheduler, expectedBlockTime: Duration, chainSpecific: ChainSpecific, ) : GenericConnector { @@ -49,7 +50,7 @@ class GenericWsConnector( upstream, chainSpecific, ) - liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, upstream.getId()) + liveness = HeadLivenessValidator(head, expectedBlockTime, headLivenessScheduler, upstream.getId()) subscriptions = EthereumWsIngressSubscription(wsSubscriptions) } From c9a40aed2343b901a4d85e10cd12363119727387 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB?= Date: Tue, 31 Oct 2023 13:00:24 +0400 Subject: [PATCH 6/7] Remove head liveness --- .../upstream/ethereum/EthereumLikeRpcUpstream.kt | 12 ++++++------ .../dshackle/upstream/generic/GenericUpstream.kt | 14 ++++++-------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt index d2041cb5b..2629a88d1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt @@ -138,12 +138,12 @@ open class EthereumLikeRpcUpstream( validatorSubscription = validator.start() .subscribe(this::setStatus) } - livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ - hasLiveSubscriptionHead.set(it) - eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED)) - }, { - log.debug("Error while checking live subscription for ${getId()}", it) - },) +// livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ +// hasLiveSubscriptionHead.set(it) +// eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED)) +// }, { +// log.debug("Error while checking live subscription for ${getId()}", it) +// },) } private fun updateLabels(label: Pair) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index b849206f7..8ed42d0c3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -7,8 +7,6 @@ import io.emeraldpay.dshackle.config.UpstreamsConfig.Labels import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.startup.QuorumForLabels -import io.emeraldpay.dshackle.startup.UpstreamChangeEvent -import io.emeraldpay.dshackle.startup.UpstreamChangeEvent.ChangeType.UPDATED import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head @@ -83,12 +81,12 @@ class GenericUpstream( log.info("Configured for ${chain.chainName}") connector.start() - livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ - hasLiveSubscriptionHead.set(it) - eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UPDATED)) - }, { - log.debug("Error while checking live subscription for ${getId()}", it) - },) +// livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ +// hasLiveSubscriptionHead.set(it) +// eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UPDATED)) +// }, { +// log.debug("Error while checking live subscription for ${getId()}", it) +// },) } override fun stop() { From 5298e5a3ea1d22f1c87017c3a4f21d648544382f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB?= Date: Tue, 31 Oct 2023 13:29:25 +0400 Subject: [PATCH 7/7] Return liveness --- .../upstream/ethereum/EthereumLikeRpcUpstream.kt | 12 ++++++------ .../upstream/ethereum/HeadLivenessValidator.kt | 6 ++++++ .../dshackle/upstream/generic/GenericUpstream.kt | 14 ++++++++------ 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt index 2629a88d1..d2041cb5b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLikeRpcUpstream.kt @@ -138,12 +138,12 @@ open class EthereumLikeRpcUpstream( validatorSubscription = validator.start() .subscribe(this::setStatus) } -// livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ -// hasLiveSubscriptionHead.set(it) -// eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED)) -// }, { -// log.debug("Error while checking live subscription for ${getId()}", it) -// },) + livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ + hasLiveSubscriptionHead.set(it) + eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED)) + }, { + log.debug("Error while checking live subscription for ${getId()}", it) + },) } private fun updateLabels(label: Pair) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt index c3136c958..522a813fa 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt @@ -3,9 +3,12 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.ThrottledLogger import io.emeraldpay.dshackle.upstream.Head import org.slf4j.LoggerFactory +import org.springframework.scheduling.concurrent.CustomizableThreadFactory import reactor.core.publisher.Flux import reactor.core.scheduler.Scheduler +import reactor.core.scheduler.Schedulers import java.time.Duration +import java.util.concurrent.Executors class HeadLivenessValidator( private val head: Head, @@ -16,6 +19,8 @@ class HeadLivenessValidator( companion object { const val CHECKED_BLOCKS_UNTIL_LIVE = 3 private val log = LoggerFactory.getLogger(HeadLivenessValidator::class.java) + val schedulerTimeout = + Schedulers.fromExecutor(Executors.newScheduledThreadPool(4, CustomizableThreadFactory("timeout-liveness"))) } fun getFlux(): Flux { @@ -51,6 +56,7 @@ class HeadLivenessValidator( ThrottledLogger.log(log, "head liveness check broken with timeout in $upstreamId") } }, + schedulerTimeout, ).repeat().subscribeOn(scheduler) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 8ed42d0c3..b849206f7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -7,6 +7,8 @@ import io.emeraldpay.dshackle.config.UpstreamsConfig.Labels import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.startup.QuorumForLabels +import io.emeraldpay.dshackle.startup.UpstreamChangeEvent +import io.emeraldpay.dshackle.startup.UpstreamChangeEvent.ChangeType.UPDATED import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head @@ -81,12 +83,12 @@ class GenericUpstream( log.info("Configured for ${chain.chainName}") connector.start() -// livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ -// hasLiveSubscriptionHead.set(it) -// eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UPDATED)) -// }, { -// log.debug("Error while checking live subscription for ${getId()}", it) -// },) + livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ + hasLiveSubscriptionHead.set(it) + eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UPDATED)) + }, { + log.debug("Error while checking live subscription for ${getId()}", it) + },) } override fun stop() {