Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some tunes #549

Merged
merged 7 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ jib {
}
}
container {
jvmFlags = ['-XX:+UseG1GC', '-XX:+ExitOnOutOfMemoryError', '-Xms1024M', '-XX:+UnlockDiagnosticVMOptions', '-XX:GCLockerRetryAllocationCount=10', '--enable-preview']
jvmFlags = ['-XX:+UseG1GC', '-XX:+ExitOnOutOfMemoryError', '-Xms1024M', '-XX:NativeMemoryTracking=summary', '-XX:+UnlockDiagnosticVMOptions', '-XX:GCLockerRetryAllocationCount=10', '--enable-preview']
mainClass = 'io.emeraldpay.dshackle.StarterKt'
args = []
ports = ['2448', '2449', '8545']
Expand Down
1 change: 1 addition & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Defaults {
const val maxMetadataSize = 16384
val timeout: Duration = Duration.ofSeconds(60)
val timeoutInternal: Duration = timeout.dividedBy(4)
val internalCallsTimeout = Duration.ofSeconds(3)
val retryConnection: Duration = Duration.ofSeconds(10)
val grpcServerKeepAliveTime: Long = 15 // seconds
val grpcServerKeepAliveTimeout: Long = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

@Configuration
open class SchedulersConfig {
Expand Down Expand Up @@ -79,17 +81,24 @@ open class SchedulersConfig {
}

private fun makePool(name: String, size: Int, monitoringConfig: MonitoringConfig): ExecutorService {
val pool = Executors.newFixedThreadPool(size, CustomizableThreadFactory("$name-"))
val cachedPool = ThreadPoolExecutor(
size,
size * threadsMultiplier,
60L,
TimeUnit.SECONDS,
SynchronousQueue(),
CustomizableThreadFactory("$name-"),
)

return if (monitoringConfig.enableExtended) {
ExecutorServiceMetrics.monitor(
Metrics.globalRegistry,
pool,
cachedPool,
name,
Tag.of("reactor_scheduler_id", "_"),
)
} else {
pool
cachedPool
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ class QuorumRequestReader(
src.onErrorResume { err ->
errorHandler.handle(api, key, err.message)

val msgError = "Error during call upstream ${api.getId()} with method ${key.method}"
val msgError = "Error during call upstream ${api.getId()} with method ${key.method}, reason - ${err.message}"
if (err is ChainCallUpstreamException) {
log.debug(msgError, err)
log.debug(msgError)
} else {
log.warn(msgError, err)
log.warn(msgError)
}

// when the call failed with an error we want to notify the quorum because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CompoundReader<K, D> (
.flatMap({ rdr ->
rdr.read(key)
.timeout(Defaults.timeoutInternal, Mono.empty())
.doOnError { t -> log.warn("Failed to read from $rdr", t) }
.doOnError { t -> log.warn("Failed to read from {}, reason - {}", rdr, t.message) }
}, 1,)
.next()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import java.io.ByteArrayInputStream
import java.security.KeyStore
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.Base64
import java.util.function.Consumer
import java.util.function.Function
Expand All @@ -31,7 +32,8 @@ abstract class HttpReader(
init {
val connectionProvider = ConnectionProvider.builder("dshackleConnectionPool")
.maxConnections(1500)
.pendingAcquireMaxCount(10000)
.pendingAcquireMaxCount(1000)
.pendingAcquireTimeout(Duration.ofSeconds(10))
.build()

var build = HttpClient.create(connectionProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.module.kotlin.readValue
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Defaults.Companion.internalCallsTimeout
import io.emeraldpay.dshackle.Global
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
Expand All @@ -17,15 +18,25 @@ abstract class UpstreamSettingsDetector(
) {
protected val log = LoggerFactory.getLogger(this::class.java)

abstract fun detectLabels(): Flux<Pair<String, String>>
fun detectLabels(): Flux<Pair<String, String>> {
return internalDetectLabels()
.timeout(internalCallsTimeout)
.onErrorResume {
log.warn("Couldn't detect lables of upstream {}, message - {}", upstream.getId(), it.message)
Flux.empty()
}
}

protected abstract fun internalDetectLabels(): Flux<Pair<String, String>>

fun detectClientVersion(): Mono<String> {
return upstream.getIngressReader()
.read(clientVersionRequest())
.flatMap(ChainResponse::requireResult)
.map(::parseClientVersion)
.timeout(internalCallsTimeout)
.onErrorResume {
log.warn("Can't detect the client version of upstream ${upstream.getId()}, reason - {}", it.message)
log.warn("Can't detect the client version of upstream {}, reason - {}", upstream.getId(), it.message)
Mono.just(UNKNOWN_CLIENT_VERSION)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class BeaconChainUpstreamSettingsDetector(
)
}

override fun detectLabels(): Flux<Pair<String, String>> {
override fun internalDetectLabels(): Flux<Pair<String, String>> {
return Flux.merge(
detectNodeType(),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.emeraldpay.dshackle.upstream.cosmos

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
Expand Down Expand Up @@ -31,10 +32,13 @@ class CosmosLowerBoundStateDetector(
}

override fun internalDetectLowerBound(): Flux<LowerBoundData> {
return upstream.getIngressReader().read(ChainRequest("status", ListParams())).map {
val resp = Global.objectMapper.readValue(it.getResult(), CosmosStatus::class.java)
LowerBoundData(resp.syncInfo.earliestBlockHeight.toLong(), STATE)
}.toFlux()
return upstream.getIngressReader()
.read(ChainRequest("status", ListParams()))
.timeout(Defaults.internalCallsTimeout)
.map {
val resp = Global.objectMapper.readValue(it.getResult(), CosmosStatus::class.java)
LowerBoundData(resp.syncInfo.earliestBlockHeight.toLong(), STATE)
}.toFlux()
}

override fun types(): Set<LowerBoundType> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import reactor.core.publisher.Flux
class CosmosUpstreamSettingsDetector(
upstream: Upstream,
) : BasicUpstreamSettingsDetector(upstream) {
override fun detectLabels(): Flux<Pair<String, String>> {
override fun internalDetectLabels(): Flux<Pair<String, String>> {
return Flux.merge(
detectNodeType(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.upstream.ethereum

import com.fasterxml.jackson.module.kotlin.readValue
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
Expand Down Expand Up @@ -72,6 +73,7 @@ class EthereumFinalizationDetector : FinalizationDetector {
upstream
.getIngressReader()
.read(req)
.timeout(Defaults.internalCallsTimeout)
.flatMap {
it.requireResult().flatMap { result ->
val block = Global.objectMapper.readValue<BlockJson<TransactionRefJson>>(result)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.upstream.ChainCallError
import io.emeraldpay.dshackle.upstream.ChainCallUpstreamException
import io.emeraldpay.dshackle.upstream.ChainRequest
Expand Down Expand Up @@ -45,6 +46,7 @@ class EthereumLowerBoundBlockDetector(
ListParams(block.toHex(), false),
),
)
.timeout(Defaults.internalCallsTimeout)
.doOnNext {
if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) {
throw IllegalStateException(NO_BLOCK_DATA)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
Expand Down Expand Up @@ -39,6 +40,7 @@ class EthereumLowerBoundLogsDetector(
),
),
)
.timeout(Defaults.internalCallsTimeout)
.doOnNext {
if (it.hasResult() && (it.getResult().contentEquals("null".toByteArray()) || it.getResult().contentEquals("[]".toByteArray()))) {
throw IllegalStateException(NO_LOGS_DATA)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
Expand Down Expand Up @@ -60,7 +61,7 @@ class EthereumLowerBoundStateDetector(
"eth_getBalance",
ListParams(ZERO_ADDRESS, block.toHex()),
),
)
).timeout(Defaults.internalCallsTimeout)
}.doOnNext {
if (it.hasResult() && it.getResult().contentEquals(Global.nullValue)) {
throw IllegalStateException("No state data")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
Expand Down Expand Up @@ -32,6 +33,7 @@ class EthereumLowerBoundTxDetector(
.read(
ChainRequest("eth_getBlockByNumber", ListParams(block.toHex(), false)),
)
.timeout(Defaults.internalCallsTimeout)
.doOnNext {
if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) {
throw IllegalStateException(NO_TX_DATA)
Expand All @@ -50,6 +52,7 @@ class EthereumLowerBoundTxDetector(
.read(
ChainRequest("eth_getTransactionByHash", ListParams(tx)),
)
.timeout(Defaults.internalCallsTimeout)
.doOnNext {
if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) {
throw IllegalStateException(NO_TX_DATA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import kotlin.text.toBigInteger

const val ZERO_ADDRESS = "0x0000000000000000000000000000000000000000"

Expand All @@ -21,7 +20,7 @@ class EthereumUpstreamSettingsDetector(
) : BasicEthUpstreamSettingsDetector(_upstream) {
private val blockNumberReader = EthereumArchiveBlockNumberReader(upstream.getIngressReader())

override fun detectLabels(): Flux<Pair<String, String>> {
override fun internalDetectLabels(): Flux<Pair<String, String>> {
return Flux.merge(
detectNodeType(),
detectArchiveNode(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.generic

import io.emeraldpay.dshackle.Defaults.Companion.internalCallsTimeout
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.upstream.SingleValidator
import io.emeraldpay.dshackle.upstream.Upstream
Expand All @@ -21,8 +22,9 @@ class AggregatedUpstreamValidator(
) { a -> a.map { it as UpstreamAvailability } }
.map(::resolve)
.defaultIfEmpty(UpstreamAvailability.OK) // upstream is OK on case there are no validators
.timeout(internalCallsTimeout)
.onErrorResume {
log.error("Error during upstream validation for ${upstream.getId()}", it)
log.error("Error during upstream validation for {}, reason - {}", upstream.getId(), it.message)
Mono.just(UpstreamAvailability.UNAVAILABLE)
}
}
Expand All @@ -33,13 +35,16 @@ class AggregatedUpstreamValidator(
) { a -> a.map { it as ValidateUpstreamSettingsResult } }
.map(::resolve)
.defaultIfEmpty(ValidateUpstreamSettingsResult.UPSTREAM_VALID)
.timeout(internalCallsTimeout)
.onErrorResume {
log.error("Error during upstream validation for ${upstream.getId()}", it)
log.error("Error during upstream validation for {}, reason - {}", upstream.getId(), it.message)
Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR)
}
}

override fun validateUpstreamSettingsOnStartup(): ValidateUpstreamSettingsResult {
return validateUpstreamSettings().block() ?: ValidateUpstreamSettingsResult.UPSTREAM_VALID
return runCatching {
validateUpstreamSettings().block(internalCallsTimeout) ?: ValidateUpstreamSettingsResult.UPSTREAM_VALID
}.getOrDefault(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ class GenericRpcHead(
override fun start() {
super.start()
refreshSubscription?.dispose()

val base = Flux.interval(interval)
.onBackpressureDrop()
.publishOn(headScheduler)
.filter { !isSyncing }
.flatMap {
.concatMap {
getLatestBlock(api)
}

refreshSubscription = super.follow(base)
}

Expand Down
Loading
Loading