Skip to content

Commit

Permalink
Some tunes (#549)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Aug 27, 2024
1 parent 68344ce commit 8ac58f5
Show file tree
Hide file tree
Showing 26 changed files with 140 additions and 63 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ jib {
}
}
container {
jvmFlags = ['-XX:+UseG1GC', '-XX:+ExitOnOutOfMemoryError', '-Xms1024M', '-XX:+UnlockDiagnosticVMOptions', '-XX:GCLockerRetryAllocationCount=10', '--enable-preview']
jvmFlags = ['-XX:+UseG1GC', '-XX:+ExitOnOutOfMemoryError', '-Xms1024M', '-XX:NativeMemoryTracking=summary', '-XX:+UnlockDiagnosticVMOptions', '-XX:GCLockerRetryAllocationCount=10', '--enable-preview']
mainClass = 'io.emeraldpay.dshackle.StarterKt'
args = []
ports = ['2448', '2449', '8545']
Expand Down
1 change: 1 addition & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Defaults {
const val maxMetadataSize = 16384
val timeout: Duration = Duration.ofSeconds(60)
val timeoutInternal: Duration = timeout.dividedBy(4)
val internalCallsTimeout = Duration.ofSeconds(3)
val retryConnection: Duration = Duration.ofSeconds(10)
val grpcServerKeepAliveTime: Long = 15 // seconds
val grpcServerKeepAliveTimeout: Long = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

@Configuration
open class SchedulersConfig {
Expand Down Expand Up @@ -79,17 +81,24 @@ open class SchedulersConfig {
}

private fun makePool(name: String, size: Int, monitoringConfig: MonitoringConfig): ExecutorService {
val pool = Executors.newFixedThreadPool(size, CustomizableThreadFactory("$name-"))
val cachedPool = ThreadPoolExecutor(
size,
size * threadsMultiplier,
60L,
TimeUnit.SECONDS,
SynchronousQueue(),
CustomizableThreadFactory("$name-"),
)

return if (monitoringConfig.enableExtended) {
ExecutorServiceMetrics.monitor(
Metrics.globalRegistry,
pool,
cachedPool,
name,
Tag.of("reactor_scheduler_id", "_"),
)
} else {
pool
cachedPool
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ class QuorumRequestReader(
src.onErrorResume { err ->
errorHandler.handle(api, key, err.message)

val msgError = "Error during call upstream ${api.getId()} with method ${key.method}"
val msgError = "Error during call upstream ${api.getId()} with method ${key.method}, reason - ${err.message}"
if (err is ChainCallUpstreamException) {
log.debug(msgError, err)
log.debug(msgError)
} else {
log.warn(msgError, err)
log.warn(msgError)
}

// when the call failed with an error we want to notify the quorum because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CompoundReader<K, D> (
.flatMap({ rdr ->
rdr.read(key)
.timeout(Defaults.timeoutInternal, Mono.empty())
.doOnError { t -> log.warn("Failed to read from $rdr", t) }
.doOnError { t -> log.warn("Failed to read from {}, reason - {}", rdr, t.message) }
}, 1,)
.next()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import java.io.ByteArrayInputStream
import java.security.KeyStore
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.Base64
import java.util.function.Consumer
import java.util.function.Function
Expand All @@ -31,7 +32,8 @@ abstract class HttpReader(
init {
val connectionProvider = ConnectionProvider.builder("dshackleConnectionPool")
.maxConnections(1500)
.pendingAcquireMaxCount(10000)
.pendingAcquireMaxCount(1000)
.pendingAcquireTimeout(Duration.ofSeconds(10))
.build()

var build = HttpClient.create(connectionProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.module.kotlin.readValue
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Defaults.Companion.internalCallsTimeout
import io.emeraldpay.dshackle.Global
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
Expand All @@ -17,15 +18,25 @@ abstract class UpstreamSettingsDetector(
) {
protected val log = LoggerFactory.getLogger(this::class.java)

abstract fun detectLabels(): Flux<Pair<String, String>>
fun detectLabels(): Flux<Pair<String, String>> {
return internalDetectLabels()
.timeout(internalCallsTimeout)
.onErrorResume {
log.warn("Couldn't detect lables of upstream {}, message - {}", upstream.getId(), it.message)
Flux.empty()
}
}

protected abstract fun internalDetectLabels(): Flux<Pair<String, String>>

fun detectClientVersion(): Mono<String> {
return upstream.getIngressReader()
.read(clientVersionRequest())
.flatMap(ChainResponse::requireResult)
.map(::parseClientVersion)
.timeout(internalCallsTimeout)
.onErrorResume {
log.warn("Can't detect the client version of upstream ${upstream.getId()}, reason - {}", it.message)
log.warn("Can't detect the client version of upstream {}, reason - {}", upstream.getId(), it.message)
Mono.just(UNKNOWN_CLIENT_VERSION)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class BeaconChainUpstreamSettingsDetector(
)
}

override fun detectLabels(): Flux<Pair<String, String>> {
override fun internalDetectLabels(): Flux<Pair<String, String>> {
return Flux.merge(
detectNodeType(),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.emeraldpay.dshackle.upstream.cosmos

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
Expand Down Expand Up @@ -31,10 +32,13 @@ class CosmosLowerBoundStateDetector(
}

override fun internalDetectLowerBound(): Flux<LowerBoundData> {
return upstream.getIngressReader().read(ChainRequest("status", ListParams())).map {
val resp = Global.objectMapper.readValue(it.getResult(), CosmosStatus::class.java)
LowerBoundData(resp.syncInfo.earliestBlockHeight.toLong(), STATE)
}.toFlux()
return upstream.getIngressReader()
.read(ChainRequest("status", ListParams()))
.timeout(Defaults.internalCallsTimeout)
.map {
val resp = Global.objectMapper.readValue(it.getResult(), CosmosStatus::class.java)
LowerBoundData(resp.syncInfo.earliestBlockHeight.toLong(), STATE)
}.toFlux()
}

override fun types(): Set<LowerBoundType> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import reactor.core.publisher.Flux
class CosmosUpstreamSettingsDetector(
upstream: Upstream,
) : BasicUpstreamSettingsDetector(upstream) {
override fun detectLabels(): Flux<Pair<String, String>> {
override fun internalDetectLabels(): Flux<Pair<String, String>> {
return Flux.merge(
detectNodeType(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.upstream.ethereum

import com.fasterxml.jackson.module.kotlin.readValue
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
Expand Down Expand Up @@ -72,6 +73,7 @@ class EthereumFinalizationDetector : FinalizationDetector {
upstream
.getIngressReader()
.read(req)
.timeout(Defaults.internalCallsTimeout)
.flatMap {
it.requireResult().flatMap { result ->
val block = Global.objectMapper.readValue<BlockJson<TransactionRefJson>>(result)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.upstream.ChainCallError
import io.emeraldpay.dshackle.upstream.ChainCallUpstreamException
import io.emeraldpay.dshackle.upstream.ChainRequest
Expand Down Expand Up @@ -45,6 +46,7 @@ class EthereumLowerBoundBlockDetector(
ListParams(block.toHex(), false),
),
)
.timeout(Defaults.internalCallsTimeout)
.doOnNext {
if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) {
throw IllegalStateException(NO_BLOCK_DATA)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
Expand Down Expand Up @@ -39,6 +40,7 @@ class EthereumLowerBoundLogsDetector(
),
),
)
.timeout(Defaults.internalCallsTimeout)
.doOnNext {
if (it.hasResult() && (it.getResult().contentEquals("null".toByteArray()) || it.getResult().contentEquals("[]".toByteArray()))) {
throw IllegalStateException(NO_LOGS_DATA)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
Expand Down Expand Up @@ -60,7 +61,7 @@ class EthereumLowerBoundStateDetector(
"eth_getBalance",
ListParams(ZERO_ADDRESS, block.toHex()),
),
)
).timeout(Defaults.internalCallsTimeout)
}.doOnNext {
if (it.hasResult() && it.getResult().contentEquals(Global.nullValue)) {
throw IllegalStateException("No state data")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
Expand Down Expand Up @@ -32,6 +33,7 @@ class EthereumLowerBoundTxDetector(
.read(
ChainRequest("eth_getBlockByNumber", ListParams(block.toHex(), false)),
)
.timeout(Defaults.internalCallsTimeout)
.doOnNext {
if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) {
throw IllegalStateException(NO_TX_DATA)
Expand All @@ -50,6 +52,7 @@ class EthereumLowerBoundTxDetector(
.read(
ChainRequest("eth_getTransactionByHash", ListParams(tx)),
)
.timeout(Defaults.internalCallsTimeout)
.doOnNext {
if (it.hasResult() && it.getResult().contentEquals("null".toByteArray())) {
throw IllegalStateException(NO_TX_DATA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import kotlin.text.toBigInteger

const val ZERO_ADDRESS = "0x0000000000000000000000000000000000000000"

Expand All @@ -21,7 +20,7 @@ class EthereumUpstreamSettingsDetector(
) : BasicEthUpstreamSettingsDetector(_upstream) {
private val blockNumberReader = EthereumArchiveBlockNumberReader(upstream.getIngressReader())

override fun detectLabels(): Flux<Pair<String, String>> {
override fun internalDetectLabels(): Flux<Pair<String, String>> {
return Flux.merge(
detectNodeType(),
detectArchiveNode(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.emeraldpay.dshackle.upstream.generic

import io.emeraldpay.dshackle.Defaults.Companion.internalCallsTimeout
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.upstream.SingleValidator
import io.emeraldpay.dshackle.upstream.Upstream
Expand All @@ -21,8 +22,9 @@ class AggregatedUpstreamValidator(
) { a -> a.map { it as UpstreamAvailability } }
.map(::resolve)
.defaultIfEmpty(UpstreamAvailability.OK) // upstream is OK on case there are no validators
.timeout(internalCallsTimeout)
.onErrorResume {
log.error("Error during upstream validation for ${upstream.getId()}", it)
log.error("Error during upstream validation for {}, reason - {}", upstream.getId(), it.message)
Mono.just(UpstreamAvailability.UNAVAILABLE)
}
}
Expand All @@ -33,13 +35,16 @@ class AggregatedUpstreamValidator(
) { a -> a.map { it as ValidateUpstreamSettingsResult } }
.map(::resolve)
.defaultIfEmpty(ValidateUpstreamSettingsResult.UPSTREAM_VALID)
.timeout(internalCallsTimeout)
.onErrorResume {
log.error("Error during upstream validation for ${upstream.getId()}", it)
log.error("Error during upstream validation for {}, reason - {}", upstream.getId(), it.message)
Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR)
}
}

override fun validateUpstreamSettingsOnStartup(): ValidateUpstreamSettingsResult {
return validateUpstreamSettings().block() ?: ValidateUpstreamSettingsResult.UPSTREAM_VALID
return runCatching {
validateUpstreamSettings().block(internalCallsTimeout) ?: ValidateUpstreamSettingsResult.UPSTREAM_VALID
}.getOrDefault(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ class GenericRpcHead(
override fun start() {
super.start()
refreshSubscription?.dispose()

val base = Flux.interval(interval)
.onBackpressureDrop()
.publishOn(headScheduler)
.filter { !isSyncing }
.flatMap {
.concatMap {
getLatestBlock(api)
}

refreshSubscription = super.follow(base)
}

Expand Down
Loading

0 comments on commit 8ac58f5

Please sign in to comment.