diff --git a/build.gradle b/build.gradle index 52166c196..72622b8a5 100644 --- a/build.gradle +++ b/build.gradle @@ -190,7 +190,7 @@ jib { } } container { - jvmFlags = ['-XX:+UseG1GC', '-XX:+ExitOnOutOfMemoryError', '-XX:NativeMemoryTracking=summary', '-XX:+UnlockDiagnosticVMOptions', '-XX:GCLockerRetryAllocationCount=10', '--enable-preview'] + jvmFlags = ['-XX:+UseG1GC', '-XX:+ExitOnOutOfMemoryError', '-Xms1024M', '-XX:NativeMemoryTracking=summary', '-XX:+UnlockDiagnosticVMOptions', '-XX:GCLockerRetryAllocationCount=10', '--enable-preview'] mainClass = 'io.emeraldpay.dshackle.StarterKt' args = [] ports = ['2448', '2449', '8545'] diff --git a/src/main/kotlin/io/emeraldpay/dshackle/commons/FluxIntervalWrapper.kt b/src/main/kotlin/io/emeraldpay/dshackle/commons/FluxIntervalWrapper.kt index 314a660e7..343376a8e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/commons/FluxIntervalWrapper.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/commons/FluxIntervalWrapper.kt @@ -1,8 +1,11 @@ package io.emeraldpay.dshackle.commons import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import zmq.util.function.Supplier import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean +import java.util.function.Function object FluxIntervalWrapper { @@ -11,8 +14,8 @@ object FluxIntervalWrapper { // which prevents performing the inner flatMap if it is not finished in the previous step fun interval( period: Duration, - mapper: Flux, - transformer: (Flux) -> Flux, + mapper: Supplier>, + transformer: Function, Flux>, ): Flux { val isProcessing = AtomicBoolean(false) @@ -21,8 +24,8 @@ object FluxIntervalWrapper { .filter { !isProcessing.get() } .flatMap { isProcessing.set(true) - mapper - .doFinally { isProcessing.set(false) } + mapper.get() + .doOnTerminate { isProcessing.set(false) } } } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpReader.kt index 93a9d273f..69cfaff9c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/HttpReader.kt @@ -31,7 +31,7 @@ abstract class HttpReader( init { val connectionProvider = ConnectionProvider.builder("dshackleConnectionPool") - .maxConnections(500) + .maxConnections(1500) .pendingAcquireMaxCount(1000) .pendingAcquireTimeout(Duration.ofSeconds(10)) .build() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt index afffb61d1..0c8722f69 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.upstream.ethereum import com.fasterxml.jackson.module.kotlin.readValue import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.Upstream @@ -72,6 +73,7 @@ class EthereumFinalizationDetector : FinalizationDetector { upstream .getIngressReader() .read(req) + .timeout(Defaults.internalCallsTimeout) .flatMap { it.requireResult().flatMap { result -> val block = Global.objectMapper.readValue>(result) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHead.kt index 2f66ef9b9..4b5e2e208 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHead.kt @@ -26,7 +26,6 @@ import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice import reactor.core.Disposable import reactor.core.scheduler.Scheduler -import reactor.kotlin.core.publisher.toFlux import java.time.Duration class GenericRpcHead( @@ -49,8 +48,9 @@ class GenericRpcHead( refreshSubscription?.dispose() val base = FluxIntervalWrapper.interval( interval, - getLatestBlock(api).toFlux(), - ) { it.publishOn(headScheduler).filter { !isSyncing } } + { getLatestBlock(api) }, + { it.publishOn(headScheduler).filter { !isSyncing } }, + ) refreshSubscription = super.follow(base) } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt index 49ddb9877..b405bad2a 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt @@ -444,10 +444,10 @@ class GenericWsHeadTest { StepVerifier.create(wsHead.headLiveness()) .expectNext(HeadLivenessState.FATAL_ERROR) .thenCancel() - .verify(Duration.ofSeconds(1)) + .verify(Duration.ofSeconds(2)) } .thenCancel() - .verify(Duration.ofSeconds(1)) + .verify(Duration.ofSeconds(3)) verify(connection, times(2)).callRpc(ChainRequest("eth_chainId", ListParams())) verify(connection, times(2)).callRpc(ChainRequest("net_version", ListParams()))