Skip to content

Commit

Permalink
remove unused specific tracks (#326)
Browse files Browse the repository at this point in the history
merge ethereum interfaces with generic ones
  • Loading branch information
a10zn8 authored Oct 24, 2023
1 parent 1310c8b commit 788db75
Show file tree
Hide file tree
Showing 34 changed files with 75 additions and 3,135 deletions.
96 changes: 0 additions & 96 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import io.emeraldpay.api.proto.Common
import io.emeraldpay.api.proto.ReactorBlockchainGrpc
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.ChainValue
import io.emeraldpay.dshackle.SilentException
import io.emeraldpay.dshackle.config.spans.ProviderSpanHandler
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Metrics
Expand All @@ -36,7 +35,6 @@ import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit

Expand All @@ -46,11 +44,8 @@ class BlockchainRpc(
private val nativeCallStream: NativeCallStream,
private val nativeSubscribe: NativeSubscribe,
private val streamHead: StreamHead,
private val trackTx: List<TrackTx>,
private val trackAddress: List<TrackAddress>,
private val describe: Describe,
private val subscribeStatus: SubscribeStatus,
private val estimateFee: EstimateFee,
private val subscribeNodeStatus: SubscribeNodeStatus,
@Qualifier("rpcScheduler")
private val scheduler: Scheduler,
Expand Down Expand Up @@ -129,97 +124,6 @@ class BlockchainRpc(
).doOnError { failMetric.increment() }
}

override fun subscribeTxStatus(requestMono: Mono<BlockchainOuterClass.TxStatusRequest>): Flux<BlockchainOuterClass.TxStatus> {
return requestMono.subscribeOn(scheduler).flatMapMany { request ->
val chain = Chain.byId(request.chainValue)
val metrics = chainMetrics.get(chain)
metrics.subscribeTxMetric.increment()
try {
trackTx.find { it.isSupported(chain) }?.let { track ->
track.subscribe(request)
.doOnNext { metrics.subscribeHeadRespMetric.increment() }
.doOnError { failMetric.increment() }
} ?: Flux.error(SilentException.UnsupportedBlockchain(chain))
} catch (t: Throwable) {
log.error("Internal error during Tx Subscription", t)
failMetric.increment()
Flux.error(IllegalStateException("Internal Error"))
}
}
}

override fun subscribeBalance(requestMono: Mono<BlockchainOuterClass.BalanceRequest>): Flux<BlockchainOuterClass.AddressBalance> {
return requestMono.subscribeOn(scheduler).flatMapMany { request ->
val chain = Chain.byId(request.asset.chainValue)
val metrics = chainMetrics.get(chain)
metrics.subscribeBalanceMetric.increment()
val asset = request.asset.code.lowercase(Locale.getDefault())
try {
trackAddress.find { it.isSupported(chain, asset) }?.let { track ->
track.subscribe(request)
.doOnNext { metrics.subscribeBalanceRespMetric.increment() }
.doOnError { failMetric.increment() }
} ?: Flux.error<BlockchainOuterClass.AddressBalance>(SilentException.UnsupportedBlockchain(chain))
.doOnSubscribe {
log.error("Balance for $chain:$asset is not supported")
}
} catch (t: Throwable) {
log.error("Internal error during Balance Subscription", t)
failMetric.increment()
Flux.error(IllegalStateException("Internal Error"))
}
}
}

override fun getBalance(requestMono: Mono<BlockchainOuterClass.BalanceRequest>): Flux<BlockchainOuterClass.AddressBalance> {
return requestMono.subscribeOn(scheduler).flatMapMany { request ->
val chain = Chain.byId(request.asset.chainValue)
val metrics = chainMetrics.get(chain)
metrics.getBalanceMetric.increment()
val asset = request.asset.code.lowercase(Locale.getDefault())
val startTime = System.currentTimeMillis()
try {
trackAddress.find { it.isSupported(chain, asset) }?.let { track ->
track.getBalance(request)
.doOnNext {
metrics.getBalanceRespMetric.record(
System.currentTimeMillis() - startTime,
TimeUnit.MILLISECONDS,
)
}
} ?: Flux.error<BlockchainOuterClass.AddressBalance>(SilentException.UnsupportedBlockchain(chain))
.doOnSubscribe {
log.error("Balance for $chain:$asset is not supported")
}
} catch (t: Throwable) {
log.error("Internal error during Balance Request", t)
failMetric.increment()
Flux.error<BlockchainOuterClass.AddressBalance>(IllegalStateException("Internal Error"))
}
}
}

override fun estimateFee(request: Mono<BlockchainOuterClass.EstimateFeeRequest>): Mono<BlockchainOuterClass.EstimateFeeResponse> {
return request
.subscribeOn(scheduler)
.flatMap {
val chain = Chain.byId(it.chainValue)
val metrics = chainMetrics.get(chain)
metrics.estimateFeeMetric.increment()
val startTime = System.currentTimeMillis()
estimateFee.estimateFee(it).doFinally {
metrics.estimateFeeRespMetric.record(
System.currentTimeMillis() - startTime,
TimeUnit.MILLISECONDS,
)
}
}
.doOnError { t ->
log.error("Internal error during Fee Estimation", t)
failMetric.increment()
}
}

override fun describe(request: Mono<BlockchainOuterClass.DescribeRequest>): Mono<BlockchainOuterClass.DescribeResponse> {
describeMetric.increment()
return describe.describe(request)
Expand Down
38 changes: 0 additions & 38 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/EstimateFee.kt

This file was deleted.

15 changes: 3 additions & 12 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.google.protobuf.ByteString
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.dshackle.BlockchainType
import io.emeraldpay.dshackle.BlockchainType.EVM_POS
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.Global.Companion.nullValue
Expand All @@ -44,9 +45,6 @@ import io.emeraldpay.dshackle.upstream.MultistreamHolder
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods
import io.emeraldpay.dshackle.upstream.calls.EthereumCallSelector
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeMultistream
import io.emeraldpay.dshackle.upstream.ethereum.EthereumMultistream
import io.emeraldpay.dshackle.upstream.ethereum.EthereumPosMultiStream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcError
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
Expand Down Expand Up @@ -84,17 +82,10 @@ open class NativeCall(
var rpcReaderFactory: RpcReaderFactory = RpcReaderFactory.default()
private val ethereumCallSelectors = EnumMap<Chain, EthereumCallSelector>(Chain::class.java)

companion object {
val casting: Map<BlockchainType, Class<out EthereumLikeMultistream>> = mapOf(
BlockchainType.EVM_POS to EthereumPosMultiStream::class.java,
BlockchainType.EVM_POW to EthereumMultistream::class.java,
)
}

@EventListener
fun onUpstreamChangeEvent(event: UpstreamChangeEvent) {
casting[BlockchainType.from(event.chain)]?.let { cast ->
multistreamHolder.getUpstream(event.chain).let { up ->
multistreamHolder.getUpstream(event.chain).let { up ->
if (BlockchainType.from(up.chain) == EVM_POS) {
ethereumCallSelectors.putIfAbsent(
event.chain,
EthereumCallSelector(up.caches),
Expand Down
10 changes: 5 additions & 5 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import io.emeraldpay.dshackle.BlockchainType
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.SilentException
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.MultistreamHolder
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLikeMultistream
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.HasUpstream
import io.emeraldpay.dshackle.upstream.signature.ResponseSigner
import io.grpc.Status
Expand Down Expand Up @@ -67,9 +67,9 @@ open class NativeSubscribe(
/**
* Try to proxy request subscription directly to the upstream dshackle instance.
* If not possible - performs subscription logic on the current instance
* @see EthereumLikeMultistream.tryProxy
* @see EthereumLikeMultistream.tryProxySubscribe
*/
val publisher = getUpstream(chain).tryProxy(matcher, request) ?: run {
val publisher = getUpstream(chain).tryProxySubscribe(matcher, request) ?: run {
val method = request.method
val params: Any? = request.payload?.takeIf { !it.isEmpty }?.let {
objectMapper.readValue(it.newInput(), Map::class.java)
Expand Down Expand Up @@ -103,8 +103,8 @@ open class NativeSubscribe(
log.error("Error during subscription to $method, chain $chain, params $params", it)
}

private fun getUpstream(chain: Chain): EthereumLikeMultistream =
multistreamHolder.getUpstream(chain).let { it as EthereumLikeMultistream }
private fun getUpstream(chain: Chain): Multistream =
multistreamHolder.getUpstream(chain)

fun convertToProto(holder: ResponseHolder): NativeSubscribeReplyItem {
if (holder.response is NativeSubscribeReplyItem) {
Expand Down
30 changes: 0 additions & 30 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackAddress.kt

This file was deleted.

Loading

0 comments on commit 788db75

Please sign in to comment.