Skip to content

Commit

Permalink
More tunes
Browse files Browse the repository at this point in the history
  • Loading branch information
Кирилл committed Aug 26, 2024
1 parent ab91c28 commit 42d99b5
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 11 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ jib {
}
}
container {
jvmFlags = ['-XX:+UseG1GC', '-XX:+ExitOnOutOfMemoryError', '-XX:NativeMemoryTracking=summary', '-XX:+UnlockDiagnosticVMOptions', '-XX:GCLockerRetryAllocationCount=10', '--enable-preview']
jvmFlags = ['-XX:+UseG1GC', '-XX:+ExitOnOutOfMemoryError', '-Xms1024M', '-XX:NativeMemoryTracking=summary', '-XX:+UnlockDiagnosticVMOptions', '-XX:GCLockerRetryAllocationCount=10', '--enable-preview']
mainClass = 'io.emeraldpay.dshackle.StarterKt'
args = []
ports = ['2448', '2449', '8545']
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package io.emeraldpay.dshackle.commons

import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import zmq.util.function.Supplier
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Function

object FluxIntervalWrapper {

Expand All @@ -11,8 +14,8 @@ object FluxIntervalWrapper {
// which prevents performing the inner flatMap if it is not finished in the previous step
fun <T> interval(
period: Duration,
mapper: Flux<T>,
transformer: (Flux<Long>) -> Flux<Long>,
mapper: Supplier<Mono<T>>,
transformer: Function<Flux<Long>, Flux<Long>>,
): Flux<T> {
val isProcessing = AtomicBoolean(false)

Expand All @@ -21,8 +24,8 @@ object FluxIntervalWrapper {
.filter { !isProcessing.get() }
.flatMap {
isProcessing.set(true)
mapper
.doFinally { isProcessing.set(false) }
mapper.get()
.doOnTerminate { isProcessing.set(false) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ abstract class HttpReader(

init {
val connectionProvider = ConnectionProvider.builder("dshackleConnectionPool")
.maxConnections(500)
.maxConnections(1500)
.pendingAcquireMaxCount(1000)
.pendingAcquireTimeout(Duration.ofSeconds(10))
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.upstream.ethereum

import com.fasterxml.jackson.module.kotlin.readValue
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
Expand Down Expand Up @@ -72,6 +73,7 @@ class EthereumFinalizationDetector : FinalizationDetector {
upstream
.getIngressReader()
.read(req)
.timeout(Defaults.internalCallsTimeout)
.flatMap {
it.requireResult().flatMap { result ->
val block = Global.objectMapper.readValue<BlockJson<TransactionRefJson>>(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import reactor.core.Disposable
import reactor.core.scheduler.Scheduler
import reactor.kotlin.core.publisher.toFlux
import java.time.Duration

class GenericRpcHead(
Expand All @@ -49,8 +48,9 @@ class GenericRpcHead(
refreshSubscription?.dispose()
val base = FluxIntervalWrapper.interval(
interval,
getLatestBlock(api).toFlux(),
) { it.publishOn(headScheduler).filter { !isSyncing } }
{ getLatestBlock(api) },
{ it.publishOn(headScheduler).filter { !isSyncing } },
)
refreshSubscription = super.follow(base)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,10 @@ class GenericWsHeadTest {
StepVerifier.create(wsHead.headLiveness())
.expectNext(HeadLivenessState.FATAL_ERROR)
.thenCancel()
.verify(Duration.ofSeconds(1))
.verify(Duration.ofSeconds(2))
}
.thenCancel()
.verify(Duration.ofSeconds(1))
.verify(Duration.ofSeconds(3))

verify(connection, times(2)).callRpc(ChainRequest("eth_chainId", ListParams()))
verify(connection, times(2)).callRpc(ChainRequest("net_version", ListParams()))
Expand Down

0 comments on commit 42d99b5

Please sign in to comment.