Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate upstream settings periodically #316

Merged
merged 2 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR
import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR
import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_VALID
import io.emeraldpay.dshackle.upstream.ethereum.connectors.ConnectorFactory
import io.emeraldpay.dshackle.upstream.ethereum.connectors.EthereumConnector
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumLabelsDetector
import org.springframework.context.ApplicationEventPublisher
import org.springframework.context.Lifecycle
import reactor.core.Disposable
import reactor.core.publisher.Flux
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean

open class EthereumLikeRpcUpstream(
Expand All @@ -58,6 +63,7 @@ open class EthereumLikeRpcUpstream(

private var validatorSubscription: Disposable? = null
private var livenessSubscription: Disposable? = null
private var validationSettingsSubscription: Disposable? = null

override fun getCapabilities(): Set<Capability> {
return if (hasLiveSubscriptionHead.get()) {
Expand All @@ -76,11 +82,53 @@ open class EthereumLikeRpcUpstream(
override fun start() {
log.info("Configured for ${chain.chainName}")
connector.start()
if (!getOptions().disableUpstreamValidation && !validator.validateUpstreamSettings()) {
connector.stop()
log.warn("Upstream ${getId()} couldn't start, invalid upstream settings")
return
val validSettingsResult = validator.validateUpstreamSettingsOnStartup()
when (validSettingsResult) {
UPSTREAM_FATAL_SETTINGS_ERROR -> {
connector.stop()
log.warn("Upstream ${getId()} couldn't start, invalid upstream settings")
return
}
UPSTREAM_SETTINGS_ERROR -> {
validateUpstreamSettings()
}
else -> {
upstreamStart()
labelsDetector.detectLabels()
.toStream()
.forEach { updateLabels(it) }
}
}
}

private fun validateUpstreamSettings() {
validationSettingsSubscription = Flux.interval(
Duration.ofSeconds(10),
Duration.ofSeconds(20),
).flatMap {
validator.validateUpstreamSettings()
}.subscribe {
when (it) {
UPSTREAM_FATAL_SETTINGS_ERROR -> {
connector.stop()
log.warn("Upstream ${getId()} couldn't start, invalid upstream settings")
disposeValidationSettingsSubscription()
}
UPSTREAM_VALID -> {
upstreamStart()
labelsDetector.detectLabels()
.subscribe { label -> updateLabels(label) }
eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.ADDED))
disposeValidationSettingsSubscription()
}
else -> {
log.warn("Continue validation of upstream ${getId()}")
}
}
}
}

private fun upstreamStart() {
if (getOptions().disableValidation) {
log.warn("Disable validation for upstream ${this.getId()}")
this.setLag(0)
Expand All @@ -96,14 +144,13 @@ open class EthereumLikeRpcUpstream(
}, {
log.debug("Error while checking live subscription for ${getId()}", it)
},)
labelsDetector.detectLabels()
.toStream()
.forEach {
log.info("Detected label ${it.first} with value ${it.second} for upstream ${getId()}")
node?.labels?.let { labels ->
labels[it.first] = it.second
}
}
}

private fun updateLabels(label: Pair<String, String>) {
log.info("Detected label ${label.first} with value ${label.second} for upstream ${getId()}")
node?.labels?.let { labels ->
labels[label.first] = label.second
}
}

override fun getIngressSubscription(): EthereumIngressSubscription {
Expand All @@ -128,11 +175,12 @@ open class EthereumLikeRpcUpstream(
validatorSubscription = null
livenessSubscription?.dispose()
livenessSubscription = null
disposeValidationSettingsSubscription()
connector.stop()
}

override fun isRunning(): Boolean {
return connector.isRunning()
return connector.isRunning() && validationSettingsSubscription == null
}

override fun getIngressReader(): JsonRpcReader {
Expand All @@ -150,4 +198,9 @@ open class EthereumLikeRpcUpstream(
}
return this as T
}

private fun disposeValidationSettingsSubscription() {
validationSettingsSubscription?.dispose()
validationSettingsSubscription = null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR
import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR
import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_VALID
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.emeraldpay.etherjar.domain.Address
Expand Down Expand Up @@ -137,19 +140,26 @@ open class EthereumUpstreamValidator @JvmOverloads constructor(
}
}

fun validateUpstreamSettings(): Boolean {
fun validateUpstreamSettingsOnStartup(): ValidateUpstreamSettingsResult {
return validateUpstreamSettings().block() ?: UPSTREAM_FATAL_SETTINGS_ERROR
}

fun validateUpstreamSettings(): Mono<ValidateUpstreamSettingsResult> {
if (options.disableUpstreamValidation) {
return Mono.just(UPSTREAM_VALID)
}
return Mono.zip(
validateChain(),
validateCallLimit(),
validateOldBlocks(),
).map {
it.t1 && it.t2 && it.t3
}.block() ?: false
listOf(it.t1, it.t2, it.t3).sorted().last()
}
}

private fun validateChain(): Mono<Boolean> {
private fun validateChain(): Mono<ValidateUpstreamSettingsResult> {
if (!options.validateChain) {
return Mono.just(true)
return Mono.just(UPSTREAM_VALID)
}
return Mono.zip(
chainId(),
Expand All @@ -167,17 +177,21 @@ open class EthereumUpstreamValidator @JvmOverloads constructor(
)
}

isChainValid
if (isChainValid) {
UPSTREAM_VALID
} else {
UPSTREAM_FATAL_SETTINGS_ERROR
}
}
.onErrorResume {
log.error("Error during chain validation", it)
Mono.just(false)
Mono.just(UPSTREAM_SETTINGS_ERROR)
}
}

private fun validateCallLimit(): Mono<Boolean> {
private fun validateCallLimit(): Mono<ValidateUpstreamSettingsResult> {
if (!options.validateCallLimit || callLimitContract == null) {
return Mono.just(true)
return Mono.just(UPSTREAM_VALID)
}
return upstream.getIngressReader()
.read(
Expand All @@ -195,15 +209,15 @@ open class EthereumUpstreamValidator @JvmOverloads constructor(
),
)
.flatMap(JsonRpcResponse::requireResult)
.map { true }
.map { UPSTREAM_VALID }
.onErrorResume {
if (it.message != null && it.message!!.contains("rpc.returndata.limit")) {
log.warn(
"Error: ${it.message}. Node ${upstream.getId()} is probably incorrectly configured. " +
"You need to set up your return limit to at least 1_100_000. " +
"Erigon config example: https://github.com/ledgerwatch/erigon/blob/d014da4dc039ea97caf04ed29feb2af92b7b129d/cmd/utils/flags.go#L369",
)
Mono.just(false)
Mono.just(UPSTREAM_FATAL_SETTINGS_ERROR)
} else {
Mono.error(it)
}
Expand All @@ -219,10 +233,10 @@ open class EthereumUpstreamValidator @JvmOverloads constructor(
"message ${ctx.exception().message}",
)
}
.onErrorReturn(false)
.onErrorReturn(UPSTREAM_SETTINGS_ERROR)
}

private fun validateOldBlocks(): Mono<Boolean> {
private fun validateOldBlocks(): Mono<ValidateUpstreamSettingsResult> {
return EthereumArchiveBlockNumberReader(upstream.getIngressReader())
.readArchiveBlock()
.flatMap {
Expand All @@ -243,11 +257,11 @@ open class EthereumUpstreamValidator @JvmOverloads constructor(
"Node ${upstream.getId()} probably is synced incorrectly, it is not possible to get old blocks",
)
}
true
UPSTREAM_VALID
}
.onErrorResume {
log.warn("Error during old blocks validation", it)
Mono.just(true)
Mono.just(UPSTREAM_VALID)
}
}

Expand Down Expand Up @@ -276,4 +290,10 @@ open class EthereumUpstreamValidator @JvmOverloads constructor(
.doOnError { log.error("Error during execution 'net_version' - ${it.message} for ${upstream.getId()}") }
.flatMap(JsonRpcResponse::requireStringResult)
}

enum class ValidateUpstreamSettingsResult {
UPSTREAM_VALID,
UPSTREAM_SETTINGS_ERROR,
UPSTREAM_FATAL_SETTINGS_ERROR,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ import java.time.Duration
import static io.emeraldpay.dshackle.Chain.ETHEREUM__MAINNET
import static io.emeraldpay.dshackle.Chain.OPTIMISM__MAINNET
import static io.emeraldpay.dshackle.upstream.UpstreamAvailability.*
import static io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR
import static io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR
import static io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstreamValidator.ValidateUpstreamSettingsResult.UPSTREAM_VALID
import static java.util.Collections.emptyList

class EthereumUpstreamValidatorSpec extends Specification {
Expand All @@ -45,18 +48,18 @@ class EthereumUpstreamValidatorSpec extends Specification {
setup:
def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, Stub(EthereumLikeUpstream), ChainOptions.PartialOptions.getDefaults().buildOptions())
expect:
validator.resolve(Tuples.of(sync, peers, call)) == exp
validator.resolve(Tuples.of(sync, peers)) == exp
where:
exp | sync | peers | call
OK | OK | OK | OK
IMMATURE | OK | IMMATURE | OK
UNAVAILABLE | OK | UNAVAILABLE | OK
SYNCING | SYNCING | OK | OK
SYNCING | SYNCING | IMMATURE | OK
UNAVAILABLE | SYNCING | UNAVAILABLE | OK
UNAVAILABLE | UNAVAILABLE | OK | OK
UNAVAILABLE | UNAVAILABLE | IMMATURE | OK
UNAVAILABLE | UNAVAILABLE | UNAVAILABLE | OK
exp | sync | peers
OK | OK | OK
IMMATURE | OK | IMMATURE
UNAVAILABLE | OK | UNAVAILABLE
SYNCING | SYNCING | OK
SYNCING | SYNCING | IMMATURE
UNAVAILABLE | SYNCING | UNAVAILABLE
UNAVAILABLE | UNAVAILABLE | OK
UNAVAILABLE | UNAVAILABLE | IMMATURE
UNAVAILABLE | UNAVAILABLE | UNAVAILABLE
}

def "Doesnt check eth_syncing when disabled"() {
Expand Down Expand Up @@ -280,9 +283,9 @@ class EthereumUpstreamValidatorSpec extends Specification {
def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options)

when:
def act = validator.validateUpstreamSettings()
def act = validator.validateUpstreamSettingsOnStartup()
then:
act
act == UPSTREAM_VALID
}

def "Upstream is valid if not error from call limit check"() {
Expand All @@ -304,9 +307,9 @@ class EthereumUpstreamValidatorSpec extends Specification {
def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96")

when:
def act = validator.validateUpstreamSettings()
def act = validator.validateUpstreamSettingsOnStartup()
then:
act
act == UPSTREAM_VALID
}

def "Upstream is not valid if error returned on call limit check"() {
Expand All @@ -328,9 +331,9 @@ class EthereumUpstreamValidatorSpec extends Specification {
def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96")

when:
def act = validator.validateUpstreamSettings()
def act = validator.validateUpstreamSettingsOnStartup()
then:
!act
act == UPSTREAM_SETTINGS_ERROR
}

def "Upstream is valid if chain settings are valid"() {
Expand All @@ -350,9 +353,9 @@ class EthereumUpstreamValidatorSpec extends Specification {
def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96")

when:
def act = validator.validateUpstreamSettings()
def act = validator.validateUpstreamSettingsOnStartup()
then:
act
act == UPSTREAM_VALID
}

def "Upstream is not valid - specified optimism but got ethereum"() {
Expand All @@ -372,9 +375,9 @@ class EthereumUpstreamValidatorSpec extends Specification {
def validator = new EthereumUpstreamValidator(OPTIMISM__MAINNET, up, options, "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96")

when:
def act = validator.validateUpstreamSettings()
def act = validator.validateUpstreamSettingsOnStartup()
then:
!act
act == UPSTREAM_FATAL_SETTINGS_ERROR
}

def "Upstream is valid if all setting are valid"() {
Expand All @@ -396,9 +399,9 @@ class EthereumUpstreamValidatorSpec extends Specification {
def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96")

when:
def act = validator.validateUpstreamSettings()
def act = validator.validateUpstreamSettingsOnStartup()
then:
act
act == UPSTREAM_VALID
}

def "Upstream is not valid if there are errors"() {
Expand All @@ -420,9 +423,9 @@ class EthereumUpstreamValidatorSpec extends Specification {
def validator = new EthereumUpstreamValidator(ETHEREUM__MAINNET, up, options, "0x32268860cAAc2948Ab5DdC7b20db5a420467Cf96")

when:
def act = validator.validateUpstreamSettings()
def act = validator.validateUpstreamSettingsOnStartup()
then:
!act
act == UPSTREAM_SETTINGS_ERROR
}


Expand Down
Loading