diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt index ddda94f6c..cc2a2c298 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt @@ -30,6 +30,16 @@ open class SchedulersConfig { return makeScheduler("head-scheduler", 4, monitoringConfig) } + @Bean + open fun wsScheduler(monitoringConfig: MonitoringConfig): Scheduler { + return makeScheduler("ws-scheduler", 4, monitoringConfig) + } + + @Bean + open fun headLivenessScheduler(monitoringConfig: MonitoringConfig): Scheduler { + return makeScheduler("head-liveness-scheduler", 4, monitoringConfig) + } + @Bean open fun wsConnectionResubscribeScheduler(monitoringConfig: MonitoringConfig): Scheduler { return makeScheduler("ws-connection-resubscribe-scheduler", 2, monitoringConfig) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index 2c15e3455..1a423165e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -98,6 +98,10 @@ open class ConfiguredUpstreams( private val clientSpansInterceptor: ClientInterceptor?, @Qualifier("headScheduler") private val headScheduler: Scheduler, + @Qualifier("wsScheduler") + private val wsScheduler: Scheduler, + @Qualifier("headLivenessScheduler") + private val headLivenessScheduler: Scheduler, private val authorizationConfig: AuthorizationConfig, private val grpcAuthContext: GrpcAuthContext, ) : ApplicationRunner { @@ -482,7 +486,7 @@ open class ConfiguredUpstreams( chain, endpoint.url, endpoint.origin ?: URI("http://localhost"), - headScheduler, + wsScheduler, ).apply { config = endpoint basicAuth = endpoint.basicAuth @@ -518,6 +522,7 @@ open class ConfiguredUpstreams( blockValidator, wsConnectionResubscribeScheduler, headScheduler, + headLivenessScheduler, chainsConf.expectedBlockTime, ) if (!connectorFactory.isValid()) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt index 0b11eed40..10b1ddffb 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt @@ -25,7 +25,6 @@ import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Lifecycle -import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice import io.emeraldpay.dshackle.upstream.generic.ChainSpecific @@ -40,6 +39,7 @@ import reactor.core.publisher.Mono import reactor.core.publisher.Sinks import reactor.core.scheduler.Scheduler import java.time.Duration +import java.util.concurrent.atomic.AtomicReference class EthereumWsHead( forkChoice: ForkChoice, @@ -49,11 +49,12 @@ class EthereumWsHead( private val skipEnhance: Boolean, private val wsConnectionResubscribeScheduler: Scheduler, private val headScheduler: Scheduler, - private val upstream: DefaultUpstream, + upstream: DefaultUpstream, chainSpecific: ChainSpecific, ) : GenericHead(upstream.getId(), forkChoice, blockValidator, headScheduler, chainSpecific), Lifecycle { private var connectionId: String? = null + private var subscriptionId = AtomicReference("") private var subscribed = false private var connected = false private var isSyncing = false @@ -94,9 +95,6 @@ class EthereumWsHead( fun listenNewHeads(): Flux { return subscribe() - .transform { - Flux.concat(it.next().doOnNext { upstream.setStatus(UpstreamAvailability.OK) }, it) - } .map { val block = Global.objectMapper.readValue(it, BlockJson::class.java) as BlockJson if (!block.checkExtraData() && skipEnhance) { @@ -144,11 +142,10 @@ class EthereumWsHead( } } .timeout(Duration.ofSeconds(60), Mono.error(RuntimeException("No response from subscribe to newHeads"))) - .onErrorResume { - log.error("Error getting heads for $upstreamId - ${it.message}") - upstream.setStatus(UpstreamAvailability.UNAVAILABLE) + .onErrorResume { err -> + log.error("Error getting heads for $upstreamId - ${err.message}") subscribed = false - Mono.empty() + unsubscribe() } } @@ -158,11 +155,23 @@ class EthereumWsHead( noHeadUpdatesSink.tryEmitComplete() } + private fun unsubscribe(): Mono { + return wsSubscriptions.unsubscribe(subscriptionId.get()) + .flatMap { it.requireResult() } + .doOnNext { log.warn("{} has just unsubscribed from newHeads", upstreamId) } + .onErrorResume { + log.error("{} couldn't unsubscribe from newHeads", upstreamId, it) + Mono.empty() + } + .then(Mono.empty()) + } + private fun subscribe(): Flux { return try { wsSubscriptions.subscribe("newHeads") .also { connectionId = it.connectionId + subscriptionId = it.subId if (!connected) { connected = true } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt index c3136c958..522a813fa 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt @@ -3,9 +3,12 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.ThrottledLogger import io.emeraldpay.dshackle.upstream.Head import org.slf4j.LoggerFactory +import org.springframework.scheduling.concurrent.CustomizableThreadFactory import reactor.core.publisher.Flux import reactor.core.scheduler.Scheduler +import reactor.core.scheduler.Schedulers import java.time.Duration +import java.util.concurrent.Executors class HeadLivenessValidator( private val head: Head, @@ -16,6 +19,8 @@ class HeadLivenessValidator( companion object { const val CHECKED_BLOCKS_UNTIL_LIVE = 3 private val log = LoggerFactory.getLogger(HeadLivenessValidator::class.java) + val schedulerTimeout = + Schedulers.fromExecutor(Executors.newScheduledThreadPool(4, CustomizableThreadFactory("timeout-liveness"))) } fun getFlux(): Flux { @@ -51,6 +56,7 @@ class HeadLivenessValidator( ThrottledLogger.log(log, "head liveness check broken with timeout in $upstreamId") } }, + schedulerTimeout, ).repeat().subscribeOn(scheduler) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt index c5893e5de..0d9cfd0d7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt @@ -339,7 +339,7 @@ open class WsConnectionImpl( val status = subscriptionResponses.tryEmitNext(subscription) if (status.isFailure) { if (status == Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER) { - log.debug("No subscribers to WS response") + log.debug("No subscribers to WS response with id {} and result {}", msg.id, msg.value?.decodeToString()) } else { log.warn("Failed to proceed with a WS message: $status") } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt index 229855f17..8e38fb757 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt @@ -15,7 +15,10 @@ */ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.util.concurrent.atomic.AtomicReference /** * A JSON-RPC Subscription client. @@ -39,10 +42,13 @@ interface WsSubscriptions { */ fun subscribe(method: String): SubscribeData + fun unsubscribe(id: String): Mono + fun connectionInfoFlux(): Flux data class SubscribeData( val data: Flux, val connectionId: String, + val subId: AtomicReference, ) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt index c9dabe069..98a1487e9 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt @@ -17,6 +17,7 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -52,7 +53,15 @@ class WsSubscriptionsImpl( } } - return WsSubscriptions.SubscribeData(messageFlux, conn.connectionId()) + return WsSubscriptions.SubscribeData(messageFlux, conn.connectionId(), subscriptionId) + } + + override fun unsubscribe(id: String): Mono { + if (id.isEmpty()) { + return Mono.empty() + } + return wsPool.getConnection() + .callRpc(JsonRpcRequest("eth_unsubscribe", listOf(id), ids.incrementAndGet())) } override fun connectionInfoFlux(): Flux = diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt index f52640ec5..042e9f8da 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt @@ -22,6 +22,7 @@ open class GenericConnectorFactory( private val blockValidator: BlockValidator, private val wsConnectionResubscribeScheduler: Scheduler, private val headScheduler: Scheduler, + private val headLivenessScheduler: Scheduler, private val expectedBlockTime: Duration, ) : ConnectorFactory { @@ -60,6 +61,7 @@ open class GenericConnectorFactory( skipEnhance, wsConnectionResubscribeScheduler, headScheduler, + headLivenessScheduler, expectedBlockTime, specific, ) @@ -77,6 +79,7 @@ open class GenericConnectorFactory( skipEnhance, wsConnectionResubscribeScheduler, headScheduler, + headLivenessScheduler, expectedBlockTime, specific, ) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt index 43ee5ea06..9f665dec7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt @@ -40,6 +40,7 @@ class GenericRpcConnector( skipEnhance: Boolean, wsConnectionResubscribeScheduler: Scheduler, headScheduler: Scheduler, + headLivenessScheduler: Scheduler, expectedBlockTime: Duration, chainSpecific: ChainSpecific, ) : GenericConnector, CachesEnabled { @@ -110,7 +111,7 @@ class GenericRpcConnector( ) } } - liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, id) + liveness = HeadLivenessValidator(head, expectedBlockTime, headLivenessScheduler, id) } override fun setCaches(caches: Caches) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt index 3bd33a2c1..6022074a0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt @@ -26,6 +26,7 @@ class GenericWsConnector( skipEnhance: Boolean, wsConnectionResubscribeScheduler: Scheduler, headScheduler: Scheduler, + headLivenessScheduler: Scheduler, expectedBlockTime: Duration, chainSpecific: ChainSpecific, ) : GenericConnector { @@ -49,7 +50,7 @@ class GenericWsConnector( upstream, chainSpecific, ) - liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, upstream.getId()) + liveness = HeadLivenessValidator(head, expectedBlockTime, headLivenessScheduler, upstream.getId()) subscriptions = EthereumWsIngressSubscription(wsSubscriptions) } diff --git a/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy index c3b66f95c..98d908951 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy @@ -35,6 +35,7 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) @@ -69,6 +70,7 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) @@ -102,6 +104,7 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) @@ -130,6 +133,7 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) @@ -163,6 +167,7 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() )