Skip to content

Commit

Permalink
Fix setting unavailable status (#308)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Sep 26, 2023
1 parent 8e26d4a commit be97ba0
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ class EthereumWsConnectionPoolFactory(
private val ethereumWsConnectionFactory: EthereumWsConnectionFactory
) {

fun create(upstream: DefaultUpstream?): WsConnectionPool {
require(upstream == null || upstream.getId() == id) {
"Creating instance for different upstream. ${upstream?.getId()} != id"
fun create(upstream: DefaultUpstream): WsConnectionPool {
require(upstream.getId() == id) {
"Creating instance for different upstream. ${upstream.getId()} != id"
}
return if (connections > 1) {
WsConnectionMultiPool(ethereumWsConnectionFactory, upstream, connections)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.reader.Reader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
Expand All @@ -38,15 +40,15 @@ import reactor.core.scheduler.Scheduler
import java.time.Duration

class EthereumWsHead(
upstreamId: String,
forkChoice: ForkChoice,
blockValidator: BlockValidator,
private val api: JsonRpcReader,
private val wsSubscriptions: WsSubscriptions,
private val skipEnhance: Boolean,
private val wsConnectionResubscribeScheduler: Scheduler,
private val headScheduler: Scheduler,
) : DefaultEthereumHead(upstreamId, forkChoice, blockValidator, headScheduler), Lifecycle {
private val upstream: DefaultUpstream
) : DefaultEthereumHead(upstream.getId(), forkChoice, blockValidator, headScheduler), Lifecycle {

private var connectionId: String? = null
private var subscribed = false
Expand Down Expand Up @@ -89,6 +91,9 @@ class EthereumWsHead(

fun listenNewHeads(): Flux<BlockContainer> {
return subscribe()
.transform {
Flux.concat(it.next().doOnNext { upstream.setStatus(UpstreamAvailability.OK) }, it)
}
.map {
val block = Global.objectMapper.readValue(it, BlockJson::class.java) as BlockJson<TransactionRefJson>
if (!block.checkExtraData() && skipEnhance) {
Expand Down Expand Up @@ -137,6 +142,8 @@ class EthereumWsHead(
}
.timeout(Duration.ofSeconds(60), Mono.error(RuntimeException("No response from subscribe to newHeads")))
.onErrorResume {
log.error("Error getting heads for $upstreamId - ${it.message}")
upstream.setStatus(UpstreamAvailability.UNAVAILABLE)
subscribed = false
Mono.empty()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import kotlin.concurrent.write
*/
class WsConnectionMultiPool(
private val ethereumWsConnectionFactory: EthereumWsConnectionFactory,
private val upstream: DefaultUpstream?,
private val upstream: DefaultUpstream,
private val connections: Int,
) : WsConnectionPool {

Expand Down Expand Up @@ -112,7 +112,7 @@ class WsConnectionMultiPool(
current.add(
ethereumWsConnectionFactory.createWsConnection(connIndex++) {
if (isUnavailable()) {
upstream?.setStatus(UpstreamAvailability.UNAVAILABLE)
upstream.setStatus(UpstreamAvailability.UNAVAILABLE)
}
}.also {
it.connect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import reactor.core.publisher.Flux

class WsConnectionSinglePool(
ethereumWsConnectionFactory: EthereumWsConnectionFactory,
private val upstream: DefaultUpstream?,
private val upstream: DefaultUpstream,
) : WsConnectionPool {
private val connection = ethereumWsConnectionFactory.createWsConnection {
upstream?.setStatus(UpstreamAvailability.UNAVAILABLE)
upstream.setStatus(UpstreamAvailability.UNAVAILABLE)
}

override fun connect() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ open class EthereumConnectorFactory(
connectorType,
httpFactory.create(upstream.getId(), chain),
wsFactory,
upstream.getId(),
upstream,
forkChoice,
blockValidator,
skipEnhance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.cache.CachesEnabled
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.MergedHead
Expand Down Expand Up @@ -31,14 +32,15 @@ class EthereumRpcConnector(
connectorType: ConnectorMode,
private val directReader: JsonRpcReader,
wsFactory: EthereumWsConnectionPoolFactory?,
id: String,
upstream: DefaultUpstream,
forkChoice: ForkChoice,
blockValidator: BlockValidator,
skipEnhance: Boolean,
wsConnectionResubscribeScheduler: Scheduler,
headScheduler: Scheduler,
expectedBlockTime: Duration
) : EthereumConnector, CachesEnabled {
private val id = upstream.getId()
private val pool: WsConnectionPool?
private val head: Head
private val liveness: HeadLivenessValidator
Expand All @@ -52,7 +54,7 @@ class EthereumRpcConnector(
}

init {
pool = wsFactory?.create(null)
pool = wsFactory?.create(upstream)

head = when (connectorType) {
RPC_ONLY -> {
Expand All @@ -67,14 +69,14 @@ class EthereumRpcConnector(
RPC_REQUESTS_WITH_MIXED_HEAD -> {
val wsHead =
EthereumWsHead(
id,
AlwaysForkChoice(),
blockValidator,
getIngressReader(),
WsSubscriptionsImpl(pool!!),
skipEnhance,
wsConnectionResubscribeScheduler,
headScheduler
headScheduler,
upstream
)
// receive all new blocks through WebSockets, but also periodically verify with RPC in case if WS failed
val rpcHead =
Expand All @@ -91,11 +93,11 @@ class EthereumRpcConnector(

RPC_REQUESTS_WITH_WS_HEAD -> {
EthereumWsHead(
id,
AlwaysForkChoice(),
blockValidator, getIngressReader(),
WsSubscriptionsImpl(pool!!), skipEnhance, wsConnectionResubscribeScheduler,
headScheduler
headScheduler,
upstream
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ class EthereumWsConnector(
reader = JsonRpcWsClient(pool)
val wsSubscriptions = WsSubscriptionsImpl(pool)
head = EthereumWsHead(
upstream.getId(),
forkChoice,
blockValidator,
reader,
wsSubscriptions,
skipEnhance,
wsConnectionResubscribeScheduler,
headScheduler
headScheduler,
upstream
)
liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, upstream.getId())
subscriptions = EthereumWsIngressSubscription(wsSubscriptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
*/
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.test.EthereumPosRpcUpstreamMock
import io.emeraldpay.dshackle.test.TestingCommons
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
import io.emeraldpay.dshackle.upstream.forkchoice.AlwaysForkChoice
import io.emeraldpay.etherjar.domain.BlockHash
import io.emeraldpay.etherjar.domain.TransactionId
import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
import io.emeraldpay.etherjar.rpc.json.TransactionRefJson
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
Expand All @@ -38,6 +41,7 @@ import java.time.temporal.ChronoUnit
class EthereumWsHeadSpec extends Specification {

BlockHash parent = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")
DefaultUpstream upstream = new EthereumPosRpcUpstreamMock(Chain.ETHEREUM__MAINNET, TestingCommons.api())

def "Fetch block"() {
setup:
Expand Down Expand Up @@ -66,7 +70,7 @@ class EthereumWsHeadSpec extends Specification {
1 * it.connectionInfoFlux() >> Flux.empty()
}

def head = new EthereumWsHead("fake", new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, false, Schedulers.boundedElastic(), Schedulers.boundedElastic())
def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, false, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream)

when:
def act = head.listenNewHeads().blockFirst()
Expand Down Expand Up @@ -107,7 +111,7 @@ class EthereumWsHeadSpec extends Specification {
]
}

def head = new EthereumWsHead("fake", new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic())
def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream)

when:
def act = head.getFlux()
Expand Down Expand Up @@ -161,7 +165,7 @@ class EthereumWsHeadSpec extends Specification {
]
}

def head = new EthereumWsHead("fake", new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic())
def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream)

when:
def act = head.getFlux()
Expand Down Expand Up @@ -201,7 +205,7 @@ class EthereumWsHeadSpec extends Specification {
]
}

def head = new EthereumWsHead("fake", new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic())
def head = new EthereumWsHead( new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream)

when:
def act = head.getFlux()
Expand Down Expand Up @@ -240,7 +244,7 @@ class EthereumWsHeadSpec extends Specification {
]
}

def head = new EthereumWsHead("fake", new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic())
def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream)

when:
def act = head.getFlux()
Expand Down Expand Up @@ -293,7 +297,7 @@ class EthereumWsHeadSpec extends Specification {
]
}

def head = new EthereumWsHead("fake", new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic())
def head = new EthereumWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, true, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream)

when:
def act = head.getFlux()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.test.EthereumPosRpcUpstreamMock
import io.emeraldpay.dshackle.test.MockWSServer
import io.emeraldpay.dshackle.test.TestingCommons
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
Expand All @@ -13,6 +15,7 @@ import spock.lang.Specification
import java.time.Duration

class WsConnectionImplRealSpec extends Specification {
DefaultUpstream upstream = new EthereumPosRpcUpstreamMock(Chain.ETHEREUM__MAINNET, TestingCommons.api())

static SLEEP = 500

Expand Down Expand Up @@ -42,7 +45,7 @@ class WsConnectionImplRealSpec extends Specification {
"http://localhost:${port}".toURI(),
Schedulers.boundedElastic()
)
).create(null).getConnection()
).create(upstream).getConnection()
}

def cleanup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.test.EthereumPosRpcUpstreamMock
import io.emeraldpay.dshackle.test.TestingCommons
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.etherjar.domain.TransactionId
import io.emeraldpay.etherjar.rpc.RpcResponseError
Expand All @@ -30,6 +32,7 @@ import spock.lang.Specification
import java.time.Duration

class WsConnectionImplSpec extends Specification {
DefaultUpstream upstream = new EthereumPosRpcUpstreamMock(Chain.ETHEREUM__MAINNET, TestingCommons.api())

def "Makes a RPC call"() {
setup:
Expand All @@ -46,7 +49,7 @@ class WsConnectionImplSpec extends Specification {
)
def apiMock = TestingCommons.api()
def wsApiMock = apiMock.asWebsocket()
def ws = wsf.create(null).getConnection() as WsConnectionImpl
def ws = wsf.create(upstream).getConnection() as WsConnectionImpl

def tx = new TransactionJson().tap {
hash = TransactionId.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")
Expand Down Expand Up @@ -81,7 +84,7 @@ class WsConnectionImplSpec extends Specification {
)
def apiMock = TestingCommons.api()
def wsApiMock = apiMock.asWebsocket()
def ws = wsf.create(null).getConnection()
def ws = wsf.create(upstream).getConnection()

apiMock.answerOnce("eth_getTransactionByHash", ["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"], null)

Expand Down Expand Up @@ -114,7 +117,7 @@ class WsConnectionImplSpec extends Specification {
)
def apiMock = TestingCommons.api()
def wsApiMock = apiMock.asWebsocket()
def ws = wsf.create(null).getConnection()
def ws = wsf.create(upstream).getConnection()

apiMock.answerOnce("eth_getTransactionByHash", ["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"],
new RpcResponseError(RpcResponseError.CODE_METHOD_NOT_EXIST, "test"))
Expand Down

0 comments on commit be97ba0

Please sign in to comment.