From 4fc05ea8a5b5280fb912cf684b121e2f434401e0 Mon Sep 17 00:00:00 2001 From: a10zn8 Date: Wed, 17 Jul 2024 17:39:19 +0300 Subject: [PATCH] more logs for ws --- .../emeraldpay/dshackle/commons/DurableFlux.kt | 3 ++- .../dshackle/commons/SharedFluxHolder.kt | 8 +++++++- .../ethereum/subscribe/NoPendingTxes.kt | 7 ------- .../generic/GenericEgressSubscription.kt | 11 +++++++++++ .../generic/GenericIngressSubscription.kt | 18 ++++++++++++++++-- 5 files changed, 36 insertions(+), 11 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/commons/DurableFlux.kt b/src/main/kotlin/io/emeraldpay/dshackle/commons/DurableFlux.kt index 505304c37..3505ecee8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/commons/DurableFlux.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/commons/DurableFlux.kt @@ -43,9 +43,10 @@ class DurableFlux( messagesSinceStart = 0 } .onErrorResume { t -> + log.warn("Error during durable flux processing", t) val backoff = errorBackOffExecution.nextBackOff() if (backoff != BackOffExecution.STOP) { - log.warn("Connection closed with ${t.message}. Reconnecting in ${backoff}ms") + log.warn("${t.message}. Reconnecting in ${backoff}ms") connect().delaySubscription(Duration.ofMillis(backoff)) } else { log.warn("Connection closed with ${t.message}. Not reconnecting") diff --git a/src/main/kotlin/io/emeraldpay/dshackle/commons/SharedFluxHolder.kt b/src/main/kotlin/io/emeraldpay/dshackle/commons/SharedFluxHolder.kt index 0229b4c3b..1dd0f386f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/commons/SharedFluxHolder.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/commons/SharedFluxHolder.kt @@ -40,7 +40,13 @@ class SharedFluxHolder( val created = Holder( provider.invoke() .share() - .doFinally { onClose(id) }, + .doOnError { + log.warn("Shared flux error", it) + } + .doFinally { + log.warn("Shared flux finished {}", it) + onClose(id) + }, id, ) lock.write { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/NoPendingTxes.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/NoPendingTxes.kt index c922de568..850d77436 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/NoPendingTxes.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/NoPendingTxes.kt @@ -17,17 +17,10 @@ package io.emeraldpay.dshackle.upstream.ethereum.subscribe import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.ethereum.domain.TransactionId -import org.slf4j.LoggerFactory import reactor.core.publisher.Flux class NoPendingTxes : PendingTxesSource { - companion object { - private val log = LoggerFactory.getLogger(NoPendingTxes::class.java) - - val DEFAULT = NoPendingTxes() - } - override fun connect(matched: Selector.Matcher): Flux { return Flux.empty() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericEgressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericEgressSubscription.kt index 06fca1fe3..e8d4d1cae 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericEgressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericEgressSubscription.kt @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream.generic import io.emeraldpay.dshackle.upstream.EgressSubscription import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Selector.Matcher +import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.scheduler.Scheduler @@ -10,6 +11,11 @@ class GenericEgressSubscription( val multistream: Multistream, val scheduler: Scheduler, ) : EgressSubscription { + + companion object { + private val log = LoggerFactory.getLogger(GenericEgressSubscription::class.java) + } + override fun getAvailableTopics(): List { return multistream.getUpstreams() .flatMap { (it as GenericUpstream).getIngressSubscription().getAvailableTopics() } @@ -22,6 +28,11 @@ class GenericEgressSubscription( .shuffled() .first { matcher.matches(it) } as GenericUpstream + val result = up.getIngressSubscription().get(topic, params)?.connect(matcher) + if (result == null) { + log.warn("subscription source not found for topic {}", topic) + return Flux.empty() + } return up.getIngressSubscription().get(topic, params)?.connect(matcher) ?: Flux.empty() } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt index 4dbd83901..d784318a2 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt @@ -6,10 +6,12 @@ import io.emeraldpay.dshackle.upstream.SubscriptionConnect import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions import io.emeraldpay.dshackle.upstream.generic.subscribe.GenericPersistentConnect import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.time.Duration import java.util.concurrent.ConcurrentHashMap +import kotlin.math.log class GenericIngressSubscription(val conn: WsSubscriptions, val methods: List) : IngressSubscription { override fun getAvailableTopics(): List { @@ -36,12 +38,24 @@ class GenericSubscriptionConnect( val params: Any?, ) : GenericPersistentConnect() { + companion object { + private val log = LoggerFactory.getLogger(GenericSubscriptionConnect::class.java) + } + @Suppress("UNCHECKED_CAST") override fun createConnection(): Flux { return conn.subscribe(ChainRequest(topic, ListParams(getParams(params) as List))) .data - .timeout(Duration.ofSeconds(60), Mono.empty()) - .onErrorResume { Mono.empty() } as Flux + .timeout( + Duration.ofSeconds(60), + Mono.empty().doOnEach { + log.warn("Timeout during subscription to $topic") + }, + ) + .onErrorResume { + log.error("Error during subscription to $topic", it) + Mono.empty() + } as Flux } private fun getParams(params: Any?): List {