Skip to content

Commit

Permalink
Merge branch 'drpcorg:master' into feature/aarch64-ubuntu-2404
Browse files Browse the repository at this point in the history
  • Loading branch information
gilbahat authored Sep 26, 2024
2 parents 01b24d1 + 7c4d061 commit 38a8586
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class BitcoinUpstreamCreator(
MergedHead(listOf(rpcHead, zeroMqHead), MostWorkForkChoice(), headScheduler)
} ?: rpcHead

val methods = buildMethods(config, chain)
val methods = buildMethods(config, chain, options)
val upstream = BitcoinRpcUpstream(
config.id
?: "bitcoin-${seq.getAndIncrement()}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ open class GenericUpstreamCreator(
if (it.connectorMode == GenericConnectorFactory.ConnectorMode.RPC_REQUESTS_WITH_MIXED_HEAD.name) it.rpc?.url ?: it.ws?.url else it.ws?.url ?: it.rpc?.url
}
val hash = getHash(nodeId, hashUrl!!, hashes)
val buildMethodsFun = { a: UpstreamsConfig.Upstream<*>, b: Chain -> this.buildMethods(a, b) }
val buildMethodsFun = { a: UpstreamsConfig.Upstream<*>, b: Chain -> this.buildMethods(a, b, options) }

val upstream = GenericUpstream(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.emeraldpay.dshackle.config.ChainsConfig
import io.emeraldpay.dshackle.config.IndexConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.foundation.ChainOptions.Options
import io.emeraldpay.dshackle.upstream.CallTargetsHolder
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.calls.ManagedCallMethods
Expand Down Expand Up @@ -70,7 +71,7 @@ abstract class UpstreamCreator(
chainConf: ChainsConfig.ChainConfig,
): UpstreamCreationData

protected fun buildMethods(config: UpstreamsConfig.Upstream<*>, chain: Chain): CallMethods {
protected fun buildMethods(config: UpstreamsConfig.Upstream<*>, chain: Chain, options: Options): CallMethods {
return if (config.methods != null || config.methodGroups != null) {
if (config.methodGroups == null) {
config.methodGroups = UpstreamsConfig.MethodGroups(setOf("filter"), setOf())
Expand All @@ -82,7 +83,7 @@ abstract class UpstreamCreator(
}

ManagedCallMethods(
delegate = callTargets.getDefaultMethods(chain, indexConfig.isChainEnabled(chain)),
delegate = callTargets.getDefaultMethods(chain, indexConfig.isChainEnabled(chain), options),
enabled = config.methods?.enabled?.map { it.name }?.toSet() ?: emptySet(),
disabled = config.methods?.disabled?.map { it.name }?.toSet() ?: emptySet(),
groupsEnabled = config.methodGroups?.enabled ?: emptySet(),
Expand All @@ -98,7 +99,7 @@ abstract class UpstreamCreator(
}
}
} else {
callTargets.getDefaultMethods(chain, indexConfig.isChainEnabled(chain))
callTargets.getDefaultMethods(chain, indexConfig.isChainEnabled(chain), options)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.emeraldpay.dshackle.BlockchainType.SOLANA
import io.emeraldpay.dshackle.BlockchainType.STARKNET
import io.emeraldpay.dshackle.BlockchainType.UNKNOWN
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.calls.DefaultBeaconChainMethods
import io.emeraldpay.dshackle.upstream.calls.DefaultBitcoinMethods
Expand All @@ -23,13 +24,13 @@ import org.springframework.stereotype.Component
class CallTargetsHolder {
private val callTargets = HashMap<Chain, CallMethods>()

fun getDefaultMethods(chain: Chain, hasLogsOracle: Boolean): CallMethods {
return callTargets[chain] ?: return setupDefaultMethods(chain, hasLogsOracle)
fun getDefaultMethods(chain: Chain, hasLogsOracle: Boolean, options: ChainOptions.Options): CallMethods {
return callTargets[chain] ?: return setupDefaultMethods(chain, hasLogsOracle, options)
}

private fun setupDefaultMethods(chain: Chain, hasLogsOracle: Boolean): CallMethods {
private fun setupDefaultMethods(chain: Chain, hasLogsOracle: Boolean, options: ChainOptions.Options): CallMethods {
val created = when (chain.type) {
BITCOIN -> DefaultBitcoinMethods()
BITCOIN -> DefaultBitcoinMethods(options.providesBalance == true)
ETHEREUM -> DefaultEthereumMethods(chain, hasLogsOracle)
STARKNET -> DefaultStarknetMethods(chain)
POLKADOT -> DefaultPolkadotMethods(chain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ open class BitcoinMultistream(
private var reader = BitcoinReader(this, head, esplora)
private var addressActiveCheck: AddressActiveCheck? = null
private var xpubAddresses: XpubAddresses? = null
private var callRouter: LocalCallRouter = LocalCallRouter(DefaultBitcoinMethods(), reader)
private var callRouter: LocalCallRouter = LocalCallRouter(DefaultBitcoinMethods(sourceUpstreams.any { it.getOptions().providesBalance == true }), reader)
override fun getUpstreams(): MutableList<out Upstream> {
return sourceUpstreams
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ abstract class BitcoinUpstream(
options: ChainOptions.Options,
role: UpstreamsConfig.UpstreamRole,
chainConfig: ChainsConfig.ChainConfig,
) : this(id, chain, options, role, DefaultBitcoinMethods(), QuorumForLabels.QuorumItem.empty(), null, chainConfig)
) : this(id, chain, options, role, DefaultBitcoinMethods(options.providesBalance == true), QuorumForLabels.QuorumItem.empty(), null, chainConfig)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import io.emeraldpay.dshackle.quorum.NotNullQuorum
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
import java.util.Collections

class DefaultBitcoinMethods : CallMethods {
class DefaultBitcoinMethods(balances: Boolean) : CallMethods {

private val networkinfo = Global.objectMapper.writeValueAsBytes(
mapOf(
Expand All @@ -49,7 +49,6 @@ class DefaultBitcoinMethods : CallMethods {
"getbestblockhash",
"getblocknumber",
"getblockcount",
"listunspent",
"getreceivedbyaddress",
"getblockchaininfo",
).sorted()
Expand All @@ -63,8 +62,12 @@ class DefaultBitcoinMethods : CallMethods {
"sendrawtransaction",
).sorted()

private val withBalances = listOf(
"listunspent",
)

private val allowedMethods =
(freshMethods + anyResponseMethods + headVerifiedMethods + broadcastMethods).sorted()
(freshMethods + anyResponseMethods + headVerifiedMethods + broadcastMethods + if (balances) withBalances else listOf()).sorted()

override fun createQuorumFor(method: String): CallQuorum {
return when {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import reactor.core.publisher.Sinks
import reactor.core.scheduler.Scheduler
import reactor.kotlin.core.publisher.switchIfEmpty
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

Expand Down Expand Up @@ -64,34 +65,34 @@ class GenericWsHead(
}
private val chainIdValidator = chainSpecific.chainSettingsValidator(upstream.getChain(), upstream, jsonRpcWsClient)

private var connectionId: String? = null
private var subscribed = false
private var connected = false
private var isSyncing = false
private val connectionId = AtomicReference<String?>(null)
private val subscribed = AtomicBoolean(false)
private val connected = AtomicBoolean(false)
private val isSyncing = AtomicBoolean(false)

private var subscription: Disposable? = null
private var headResubSubscription: Disposable? = null
private val subscription = AtomicReference<Disposable?>()
private val headResubSubscription = AtomicReference<Disposable?>()
private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort<Boolean>()

private var subscriptionId = AtomicReference("")
private val subscriptionId = AtomicReference("")

override fun isRunning(): Boolean {
return subscription != null
return subscription.get() != null
}

override fun start() {
super.start()
this.subscription?.dispose()
this.subscribed = true
this.subscription.get()?.dispose()
this.subscribed.set(true)
val heads = Flux.merge(
// get the current block, not just wait for the next update
getLatestBlock(api),
listenNewHeads(),
)
this.subscription = super.follow(heads)
this.subscription.set(super.follow(heads))

if (headResubSubscription == null) {
headResubSubscription = registerHeadResubscribeFlux()
if (headResubSubscription.get() == null) {
headResubSubscription.set(registerHeadResubscribeFlux())
}
}

Expand All @@ -100,10 +101,10 @@ class GenericWsHead(
}

override fun onSyncingNode(isSyncing: Boolean) {
if (isSyncing && !this.isSyncing) {
if (isSyncing && !this.isSyncing.get()) {
cancelSub()
}
this.isSyncing = isSyncing
this.isSyncing.set(isSyncing)
}

private fun listenNewHeads(): Flux<BlockContainer> {
Expand All @@ -129,7 +130,7 @@ class GenericWsHead(
}
UPSTREAM_SETTINGS_ERROR -> {
log.warn("Couldn't check chain settings via ws connection for {}, ws sub will be recreated", upstreamId)
subscribed = false
subscribed.set(false)
Mono.empty()
}
UPSTREAM_FATAL_SETTINGS_ERROR -> {
Expand All @@ -144,16 +145,15 @@ class GenericWsHead(
override fun stop() {
super.stop()
cancelSub()
headResubSubscription?.dispose()
headResubSubscription = null
headResubSubscription.getAndSet(null)?.dispose()
}

override fun chainIdValidator(): SingleValidator<ValidateUpstreamSettingsResult>? {
return chainIdValidator
}

private fun unsubscribe(): Mono<BlockContainer> {
subscribed = false
subscribed.set(false)
return wsSubscriptions.unsubscribe(chainSpecific.unsubscribeNewHeadsRequest(subscriptionId.get()).copy(id = ids.getAndIncrement()))
.flatMap { it.requireResult() }
.doOnNext { log.warn("{} has just unsubscribed from newHeads", upstreamId) }
Expand All @@ -170,10 +170,10 @@ class GenericWsHead(
return try {
wsSubscriptions.subscribe(chainSpecific.listenNewHeadsRequest().copy(id = ids.getAndIncrement()))
.also {
connectionId = it.connectionId
subscriptionId = it.subId
if (!connected) {
connected = true
connectionId.set(it.connectionId)
subscriptionId.set(it.subId.get())
if (!connected.get()) {
connected.set(true)
}
}.data
} catch (e: Exception) {
Expand All @@ -184,13 +184,13 @@ class GenericWsHead(
private fun registerHeadResubscribeFlux(): Disposable {
val connectionStates = wsSubscriptions.connectionInfoFlux()
.map {
if (it.connectionId == connectionId && it.connectionState == WsConnection.ConnectionState.DISCONNECTED) {
if (it.connectionId == connectionId.get() && it.connectionState == WsConnection.ConnectionState.DISCONNECTED) {
headLivenessSink.emitNext(HeadLivenessState.DISCONNECTED) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
subscribed = false
connected = false
connectionId = null
subscribed.set(false)
connected.set(false)
connectionId.set(null)
} else if (it.connectionState == WsConnection.ConnectionState.CONNECTED) {
connected = true
connected.set(true)
return@map true
}
return@map false
Expand All @@ -200,16 +200,15 @@ class GenericWsHead(
noHeadUpdatesSink.asFlux(),
connectionStates,
).publishOn(wsConnectionResubscribeScheduler)
.filter { it && !subscribed && connected && !isSyncing }
.filter { it && !subscribed.get() && connected.get() && !isSyncing.get() }
.subscribe {
log.warn("Restart ws head, upstreamId: $upstreamId")
start()
}
}

private fun cancelSub() {
subscription?.dispose()
subscription = null
subscribed = false
subscription.getAndSet(null)?.dispose()
subscribed.set(false)
}
}
Loading

0 comments on commit 38a8586

Please sign in to comment.