diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsConnectionPoolFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsConnectionPoolFactory.kt index 3d76b6c79..8ea17b1b2 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsConnectionPoolFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsConnectionPoolFactory.kt @@ -24,9 +24,9 @@ class EthereumWsConnectionPoolFactory( private val ethereumWsConnectionFactory: EthereumWsConnectionFactory ) { - fun create(upstream: DefaultUpstream?): WsConnectionPool { - require(upstream == null || upstream.getId() == id) { - "Creating instance for different upstream. ${upstream?.getId()} != id" + fun create(upstream: DefaultUpstream): WsConnectionPool { + require(upstream.getId() == id) { + "Creating instance for different upstream. ${upstream.getId()} != id" } return if (connections > 1) { WsConnectionMultiPool(ethereumWsConnectionFactory, upstream, connections) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt index 54896dad3..04531b194 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt @@ -23,7 +23,9 @@ import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.upstream.BlockValidator +import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Lifecycle +import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest @@ -38,7 +40,6 @@ import reactor.core.scheduler.Scheduler import java.time.Duration class EthereumWsHead( - upstreamId: String, forkChoice: ForkChoice, blockValidator: BlockValidator, private val api: JsonRpcReader, @@ -46,7 +47,8 @@ class EthereumWsHead( private val skipEnhance: Boolean, private val wsConnectionResubscribeScheduler: Scheduler, private val headScheduler: Scheduler, -) : DefaultEthereumHead(upstreamId, forkChoice, blockValidator, headScheduler), Lifecycle { + private val upstream: DefaultUpstream +) : DefaultEthereumHead(upstream.getId(), forkChoice, blockValidator, headScheduler), Lifecycle { private var connectionId: String? = null private var subscribed = false @@ -89,6 +91,9 @@ class EthereumWsHead( fun listenNewHeads(): Flux { return subscribe() + .transform { + Flux.concat(it.next().doOnNext { upstream.setStatus(UpstreamAvailability.OK) }, it) + } .map { val block = Global.objectMapper.readValue(it, BlockJson::class.java) as BlockJson if (!block.checkExtraData() && skipEnhance) { @@ -137,6 +142,8 @@ class EthereumWsHead( } .timeout(Duration.ofSeconds(60), Mono.error(RuntimeException("No response from subscribe to newHeads"))) .onErrorResume { + log.error("Error getting heads for $upstreamId - ${it.message}") + upstream.setStatus(UpstreamAvailability.UNAVAILABLE) subscribed = false Mono.empty() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionMultiPool.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionMultiPool.kt index 74fcb4007..a15e3dc60 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionMultiPool.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionMultiPool.kt @@ -39,7 +39,7 @@ import kotlin.concurrent.write */ class WsConnectionMultiPool( private val ethereumWsConnectionFactory: EthereumWsConnectionFactory, - private val upstream: DefaultUpstream?, + private val upstream: DefaultUpstream, private val connections: Int, ) : WsConnectionPool { @@ -112,7 +112,7 @@ class WsConnectionMultiPool( current.add( ethereumWsConnectionFactory.createWsConnection(connIndex++) { if (isUnavailable()) { - upstream?.setStatus(UpstreamAvailability.UNAVAILABLE) + upstream.setStatus(UpstreamAvailability.UNAVAILABLE) } }.also { it.connect() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionSinglePool.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionSinglePool.kt index fac22b7d2..90894d97f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionSinglePool.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionSinglePool.kt @@ -21,10 +21,10 @@ import reactor.core.publisher.Flux class WsConnectionSinglePool( ethereumWsConnectionFactory: EthereumWsConnectionFactory, - private val upstream: DefaultUpstream?, + private val upstream: DefaultUpstream, ) : WsConnectionPool { private val connection = ethereumWsConnectionFactory.createWsConnection { - upstream?.setStatus(UpstreamAvailability.UNAVAILABLE) + upstream.setStatus(UpstreamAvailability.UNAVAILABLE) } override fun connect() { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumConnectorFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumConnectorFactory.kt index a25fe4964..88ead79bd 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumConnectorFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumConnectorFactory.kt @@ -70,7 +70,7 @@ open class EthereumConnectorFactory( connectorType, httpFactory.create(upstream.getId(), chain), wsFactory, - upstream.getId(), + upstream, forkChoice, blockValidator, skipEnhance, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumRpcConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumRpcConnector.kt index 5c1732db4..284bbe932 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumRpcConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumRpcConnector.kt @@ -4,6 +4,7 @@ import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.cache.CachesEnabled import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.upstream.BlockValidator +import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.Lifecycle import io.emeraldpay.dshackle.upstream.MergedHead @@ -31,7 +32,7 @@ class EthereumRpcConnector( connectorType: ConnectorMode, private val directReader: JsonRpcReader, wsFactory: EthereumWsConnectionPoolFactory?, - id: String, + upstream: DefaultUpstream, forkChoice: ForkChoice, blockValidator: BlockValidator, skipEnhance: Boolean, @@ -39,6 +40,7 @@ class EthereumRpcConnector( headScheduler: Scheduler, expectedBlockTime: Duration ) : EthereumConnector, CachesEnabled { + private val id = upstream.getId() private val pool: WsConnectionPool? private val head: Head private val liveness: HeadLivenessValidator @@ -52,7 +54,7 @@ class EthereumRpcConnector( } init { - pool = wsFactory?.create(null) + pool = wsFactory?.create(upstream) head = when (connectorType) { RPC_ONLY -> { @@ -67,14 +69,14 @@ class EthereumRpcConnector( RPC_REQUESTS_WITH_MIXED_HEAD -> { val wsHead = EthereumWsHead( - id, AlwaysForkChoice(), blockValidator, getIngressReader(), WsSubscriptionsImpl(pool!!), skipEnhance, wsConnectionResubscribeScheduler, - headScheduler + headScheduler, + upstream ) // receive all new blocks through WebSockets, but also periodically verify with RPC in case if WS failed val rpcHead = @@ -91,11 +93,11 @@ class EthereumRpcConnector( RPC_REQUESTS_WITH_WS_HEAD -> { EthereumWsHead( - id, AlwaysForkChoice(), blockValidator, getIngressReader(), WsSubscriptionsImpl(pool!!), skipEnhance, wsConnectionResubscribeScheduler, - headScheduler + headScheduler, + upstream ) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumWsConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumWsConnector.kt index 9755c578d..d5f1870e3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumWsConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumWsConnector.kt @@ -37,14 +37,14 @@ class EthereumWsConnector( reader = JsonRpcWsClient(pool) val wsSubscriptions = WsSubscriptionsImpl(pool) head = EthereumWsHead( - upstream.getId(), forkChoice, blockValidator, reader, wsSubscriptions, skipEnhance, wsConnectionResubscribeScheduler, - headScheduler + headScheduler, + upstream ) liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, upstream.getId()) subscriptions = EthereumWsIngressSubscription(wsSubscriptions) diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHeadSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHeadSpec.groovy index 8cbf8f22d..8f9e4d9a7 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHeadSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHeadSpec.groovy @@ -15,14 +15,17 @@ */ package io.emeraldpay.dshackle.upstream.ethereum +import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.test.EthereumPosRpcUpstreamMock import io.emeraldpay.dshackle.test.TestingCommons import io.emeraldpay.dshackle.upstream.BlockValidator +import io.emeraldpay.dshackle.upstream.DefaultUpstream +import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson import io.emeraldpay.dshackle.upstream.forkchoice.AlwaysForkChoice import io.emeraldpay.etherjar.domain.BlockHash import io.emeraldpay.etherjar.domain.TransactionId -import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson import io.emeraldpay.etherjar.rpc.json.TransactionRefJson import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -38,6 +41,7 @@ import java.time.temporal.ChronoUnit class EthereumWsHeadSpec extends Specification { BlockHash parent = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200") + DefaultUpstream upstream = new EthereumPosRpcUpstreamMock(Chain.ETHEREUM__MAINNET, TestingCommons.api()) def "Fetch block"() { setup: @@ -66,7 +70,7 @@ class EthereumWsHeadSpec extends Specification { 1 * it.connectionInfoFlux() >> Flux.empty() } - def head = new EthereumWsHead("fake", new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, false, Schedulers.boundedElastic(), Schedulers.boundedElastic()) + def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, false, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream) when: def act = head.listenNewHeads().blockFirst() @@ -107,7 +111,7 @@ class EthereumWsHeadSpec extends Specification { ] } - def head = new EthereumWsHead("fake", new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic()) + def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream) when: def act = head.getFlux() @@ -161,7 +165,7 @@ class EthereumWsHeadSpec extends Specification { ] } - def head = new EthereumWsHead("fake", new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic()) + def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream) when: def act = head.getFlux() @@ -201,7 +205,7 @@ class EthereumWsHeadSpec extends Specification { ] } - def head = new EthereumWsHead("fake", new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic()) + def head = new EthereumWsHead( new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream) when: def act = head.getFlux() @@ -240,7 +244,7 @@ class EthereumWsHeadSpec extends Specification { ] } - def head = new EthereumWsHead("fake", new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic()) + def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream) when: def act = head.getFlux() @@ -293,7 +297,7 @@ class EthereumWsHeadSpec extends Specification { ] } - def head = new EthereumWsHead("fake", new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic()) + def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream) when: def act = head.getFlux() diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplRealSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplRealSpec.groovy index e9a0b79af..c8fb2ffb4 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplRealSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplRealSpec.groovy @@ -1,7 +1,9 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.test.EthereumPosRpcUpstreamMock import io.emeraldpay.dshackle.test.MockWSServer +import io.emeraldpay.dshackle.test.TestingCommons import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest @@ -13,6 +15,7 @@ import spock.lang.Specification import java.time.Duration class WsConnectionImplRealSpec extends Specification { + DefaultUpstream upstream = new EthereumPosRpcUpstreamMock(Chain.ETHEREUM__MAINNET, TestingCommons.api()) static SLEEP = 500 @@ -42,7 +45,7 @@ class WsConnectionImplRealSpec extends Specification { "http://localhost:${port}".toURI(), Schedulers.boundedElastic() ) - ).create(null).getConnection() + ).create(upstream).getConnection() } def cleanup() { diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplSpec.groovy index edaf98b47..2a8c55ff5 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplSpec.groovy @@ -17,7 +17,9 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.test.EthereumPosRpcUpstreamMock import io.emeraldpay.dshackle.test.TestingCommons +import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.etherjar.domain.TransactionId import io.emeraldpay.etherjar.rpc.RpcResponseError @@ -30,6 +32,7 @@ import spock.lang.Specification import java.time.Duration class WsConnectionImplSpec extends Specification { + DefaultUpstream upstream = new EthereumPosRpcUpstreamMock(Chain.ETHEREUM__MAINNET, TestingCommons.api()) def "Makes a RPC call"() { setup: @@ -46,7 +49,7 @@ class WsConnectionImplSpec extends Specification { ) def apiMock = TestingCommons.api() def wsApiMock = apiMock.asWebsocket() - def ws = wsf.create(null).getConnection() as WsConnectionImpl + def ws = wsf.create(upstream).getConnection() as WsConnectionImpl def tx = new TransactionJson().tap { hash = TransactionId.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200") @@ -81,7 +84,7 @@ class WsConnectionImplSpec extends Specification { ) def apiMock = TestingCommons.api() def wsApiMock = apiMock.asWebsocket() - def ws = wsf.create(null).getConnection() + def ws = wsf.create(upstream).getConnection() apiMock.answerOnce("eth_getTransactionByHash", ["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"], null) @@ -114,7 +117,7 @@ class WsConnectionImplSpec extends Specification { ) def apiMock = TestingCommons.api() def wsApiMock = apiMock.asWebsocket() - def ws = wsf.create(null).getConnection() + def ws = wsf.create(upstream).getConnection() apiMock.answerOnce("eth_getTransactionByHash", ["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"], new RpcResponseError(RpcResponseError.CODE_METHOD_NOT_EXIST, "test"))