Skip to content

Commit

Permalink
Track upstream finalization data (#499)
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 authored Jun 11, 2024
1 parent 3e898cf commit effe881
Show file tree
Hide file tree
Showing 32 changed files with 932 additions and 143 deletions.
2 changes: 1 addition & 1 deletion emerald-grpc
23 changes: 20 additions & 3 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.emeraldpay.dshackle.rpc
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.protobuf.ByteString
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.api.proto.Common
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.Global.Companion.nullValue
Expand Down Expand Up @@ -47,6 +48,7 @@ import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcResponseError
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
import io.emeraldpay.dshackle.upstream.rpcclient.CallParams
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import io.emeraldpay.dshackle.upstream.rpcclient.ObjectParams
Expand Down Expand Up @@ -243,6 +245,12 @@ open class NativeCall(
result.upstreamId = it.id
result.upstreamNodeVersion = it.nodeVersion
}
it.finalization?.let {
result.finalization = Common.FinalizationData.newBuilder()
.setHeight(it.height)
.setType(it.type.toProtoFinalizationType())
.build()
}
return result.build()
}

Expand Down Expand Up @@ -426,16 +434,20 @@ open class NativeCall(
val resolvedUpstreamData = it.resolvedUpstreamData ?: ctx.upstream.getUpstreamSettingsData()
validateResult(result, "local", ctx)
if (ctx.nonce != null) {
CallResult.ok(ctx.id, ctx.nonce, result, signer.sign(ctx.nonce, result, resolvedUpstreamData?.id ?: ctx.upstream.getId()), resolvedUpstreamData, ctx)
CallResult.ok(ctx.id, ctx.nonce, result, signer.sign(ctx.nonce, result, resolvedUpstreamData?.id ?: ctx.upstream.getId()), resolvedUpstreamData, ctx, it.finalization)
} else {
CallResult.ok(ctx.id, null, result, null, resolvedUpstreamData, ctx)
CallResult.ok(ctx.id, null, result, null, resolvedUpstreamData, ctx, it.finalization)
}
}
}.switchIfEmpty(
Mono.just(ctx).flatMap(this::executeOnRemote),
)
.onErrorResume {
Mono.just(CallResult.fail(ctx.id, ctx.nonce, it, ctx))
}.doOnNext {
if (it.finalization != null && it.upstreamSettingsData != null) {
ctx.upstream.addFinalization(it.finalization, it.upstreamSettingsData.id)
}
}
}

Expand Down Expand Up @@ -700,7 +712,7 @@ open class NativeCall(
}
}

open class CallResult(
open class CallResult @JvmOverloads constructor(
val id: Int,
val nonce: Long?,
val result: ByteArray?,
Expand All @@ -709,6 +721,7 @@ open class NativeCall(
val upstreamSettingsData: Upstream.UpstreamSettingsData?,
val ctx: ValidCallContext<ParsedCallDetails>?,
val stream: Flux<Chunk>? = null,
val finalization: FinalizationData? = null,
) {

constructor(
Expand All @@ -725,6 +738,10 @@ open class NativeCall(
return CallResult(id, nonce, result, null, signature, upstreamSettingsData, ctx)
}

fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: Upstream.UpstreamSettingsData?, ctx: ValidCallContext<ParsedCallDetails>?, final: FinalizationData?): CallResult {
return CallResult(id, nonce, result, null, signature, upstreamSettingsData, ctx, null, final)
}

fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: Upstream.UpstreamSettingsData?, ctx: ValidCallContext<ParsedCallDetails>?, stream: Flux<Chunk>?): CallResult {
return CallResult(id, nonce, result, null, signature, upstreamSettingsData, ctx, stream)
}
Expand Down
22 changes: 15 additions & 7 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import reactor.core.publisher.Mono
class StreamHead(
@Autowired private val multistreamHolder: MultistreamHolder,
) {

private val log = LoggerFactory.getLogger(StreamHead::class.java)

fun add(requestMono: Mono<Common.Chain>): Flux<BlockchainOuterClass.ChainHead> {
Expand All @@ -54,12 +53,20 @@ class StreamHead(

fun asProto(ms: Multistream, chain: Chain, block: BlockContainer): BlockchainOuterClass.ChainHead {
val msLowerBounds = ms.getLowerBounds()
val lowerBoundsProto = msLowerBounds
.map {
BlockchainOuterClass.LowerBound.newBuilder()
.setLowerBoundTimestamp(it.timestamp)
.setLowerBoundType(toProtoLowerBoundType(it.type))
.setLowerBoundValue(it.lowerBound)
val lowerBoundsProto =
msLowerBounds
.map {
BlockchainOuterClass.LowerBound.newBuilder()
.setLowerBoundTimestamp(it.timestamp)
.setLowerBoundType(toProtoLowerBoundType(it.type))
.setLowerBoundValue(it.lowerBound)
.build()
}
val finalizationData =
ms.getFinalizations().map {
Common.FinalizationData.newBuilder()
.setHeight(it.height)
.setType(it.type.toProtoFinalizationType())
.build()
}
val toOldApi = toOldApi(msLowerBounds)
Expand All @@ -72,6 +79,7 @@ class StreamHead(
.setCurrentLowerSlot(toOldApi.slot)
.setCurrentLowerDataTimestamp(toOldApi.timestamp)
.addAllLowerBounds(lowerBoundsProto)
.addAllFinalizationData(finalizationData)
.setTimestamp(block.timestamp.toEpochMilli())
.setWeight(ByteString.copyFrom(block.difficulty.toByteArray()))
.setBlockId(block.hash.toHex())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ open class GenericUpstreamCreator(
cs::upstreamRpcModulesDetector,
buildMethodsFun,
cs::lowerBoundService,
cs::finalizationDetectorBuilder,
)

upstream.start()
Expand Down
12 changes: 8 additions & 4 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainResponse.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package io.emeraldpay.dshackle.upstream
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.SerializerProvider
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
import io.emeraldpay.dshackle.upstream.signature.ResponseSigner
import io.emeraldpay.dshackle.upstream.stream.Chunk
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

class ChainResponse(
class ChainResponse @JvmOverloads constructor(
private val result: ByteArray?,
val error: ChainCallError?,
val id: Id,
Expand All @@ -33,15 +34,18 @@ class ChainResponse(
*/
val providedSignature: ResponseSigner.Signature? = null,
val resolvedUpstreamData: Upstream.UpstreamSettingsData? = null,
val finalization: FinalizationData? = null,
) {

constructor(stream: Flux<Chunk>, id: Int) :
this(null, null, NumberId(id.toLong()), stream, null, null)
this(null, null, NumberId(id.toLong()), stream, null, null, null)

constructor(result: ByteArray?, error: ChainCallError?) : this(result, error, NumberId(0), null)
constructor(result: ByteArray?, error: ChainCallError?) : this(result, error, NumberId(0), null, null)

constructor(result: ByteArray?, error: ChainCallError?, resolvedUpstreamData: Upstream.UpstreamSettingsData?) :
this(result, error, NumberId(0), null, null, resolvedUpstreamData)
this(result, error, NumberId(0), null, null, resolvedUpstreamData, null)
constructor(result: ByteArray?, resolvedUpstreamData: Upstream.UpstreamSettingsData?, finalization: FinalizationData) :
this(result, null, NumberId(0), null, null, resolvedUpstreamData, finalization)

companion object {
private val NULL_VALUE = "null".toByteArray()
Expand Down
15 changes: 15 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
import io.emeraldpay.dshackle.upstream.calls.AggregatedCallMethods
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.calls.CallSelector
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
import io.emeraldpay.dshackle.upstream.finalization.FinalizationType
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.micrometer.core.instrument.Gauge
Expand All @@ -48,6 +50,7 @@ import reactor.core.publisher.Sinks
import reactor.core.scheduler.Scheduler
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import kotlin.math.max

/**
* Aggregation of multiple upstreams responding to a single blockchain
Expand Down Expand Up @@ -355,6 +358,18 @@ abstract class Multistream(
started = true
}

override fun getFinalizations(): Collection<FinalizationData> {
return getAll().flatMap { it.getFinalizations() }
.fold(mutableMapOf<FinalizationType, Long>()) { acc, data ->
acc[data.type] = max(acc[data.type] ?: 0, data.height)
acc
}.toList().map { FinalizationData(it.second, it.first) }
}

override fun addFinalization(finalization: FinalizationData, upstreamId: String) {
getAll().find { it.getId() == upstreamId }?.addFinalization(finalization, upstreamId)
}

override fun getLowerBounds(): Collection<LowerBoundData> {
return lowerBounds.values
}
Expand Down
72 changes: 61 additions & 11 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.emeraldpay.dshackle.upstream.MatchesResponse.NotMatchedResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.SameNodeResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.SlotHeightResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.Success
import io.emeraldpay.dshackle.upstream.finalization.FinalizationType
import io.emeraldpay.dshackle.upstream.lowerbound.fromProtoType
import org.apache.commons.lang3.StringUtils
import java.util.Collections
Expand All @@ -41,6 +42,51 @@ class Selector {
@JvmStatic
val anyLabel = AnyLabelMatcher()

sealed class HeightNumberOrTag {

companion object {
fun fromHeightSelector(selector: BlockchainOuterClass.HeightSelector): HeightNumberOrTag? {
return when (selector.heightOrNumberCase) {
BlockchainOuterClass.HeightSelector.HeightOrNumberCase.HEIGHTORNUMBER_NOT_SET ->
return if (selector.height == -1L) {
Latest
} else {
Number(selector.height)
}
BlockchainOuterClass.HeightSelector.HeightOrNumberCase.NUMBER -> Number(selector.number)
BlockchainOuterClass.HeightSelector.HeightOrNumberCase.TAG -> when (selector.tag) {
BlockchainOuterClass.BlockTag.SAFE -> Safe
BlockchainOuterClass.BlockTag.LATEST -> Latest
BlockchainOuterClass.BlockTag.PENDING -> Pending
BlockchainOuterClass.BlockTag.FINALIZED -> Finalized
else -> null
}
else -> null
}
}
}
class Number(val num: Long) : HeightNumberOrTag()
object Pending : HeightNumberOrTag()
object Latest : HeightNumberOrTag()
object Safe : HeightNumberOrTag()
object Finalized : HeightNumberOrTag()

fun getSort(): Sort {
return when (this) {
is Latest -> Sort(
compareByDescending {
it.getHead().getCurrentHeight()
},
)

is Safe -> Sort.safe
is Finalized -> Sort.finalized

else -> Sort.default
}
}
}

@JvmStatic
fun convertToUpstreamFilter(selectors: List<BlockchainOuterClass.Selector>): UpstreamFilter {
val matcher = selectors
Expand All @@ -50,15 +96,9 @@ class Selector {
SlotMatcher(it.slotHeightSelector.slotHeight)
}
it.hasHeightSelector() -> {
val height = if (it.heightSelector.height == -1L) {
null
} else {
it.heightSelector.height
}
if (height == null) {
empty
} else {
HeightMatcher(height)
when (val selector = HeightNumberOrTag.fromHeightSelector(it.heightSelector)) {
is HeightNumberOrTag.Number -> HeightMatcher(selector.num)
else -> empty
}
}
else -> empty
Expand All @@ -71,8 +111,8 @@ class Selector {

private fun getSort(selectors: List<BlockchainOuterClass.Selector>): Sort {
selectors.forEach { selector ->
if (selector.hasHeightSelector() && selector.heightSelector.height == -1L) {
return Sort(compareByDescending { it.getHead().getCurrentHeight() })
if (selector.hasHeightSelector()) {
return HeightNumberOrTag.fromHeightSelector(selector.heightSelector)?.getSort() ?: Sort.default
} else if (selector.hasLowerHeightSelector()) {
return Sort(
compareBy(nullsLast()) {
Expand Down Expand Up @@ -180,6 +220,16 @@ class Selector {
companion object {
@JvmStatic
val default = Sort(compareBy { null })
val safe = Sort(
compareByDescending { up ->
up.getFinalizations().find { it.type == FinalizationType.SAFE_BLOCK }?.height ?: 0L
},
)
val finalized = Sort(
compareByDescending { up ->
up.getFinalizations().find { it.type == FinalizationType.FINALIZED_BLOCK }?.height ?: 0L
},
)
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -49,6 +50,8 @@ interface Upstream : Lifecycle {
fun isGrpc(): Boolean
fun getLowerBounds(): Collection<LowerBoundData>
fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData?
fun getFinalizations(): Collection<FinalizationData>
fun addFinalization(finalization: FinalizationData, upstreamId: String)
fun getUpstreamSettingsData(): UpstreamSettingsData?
fun updateLowerBound(lowerBound: Long, type: LowerBoundType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import reactor.core.Disposable
Expand Down Expand Up @@ -81,6 +82,13 @@ open class BitcoinRpcUpstream(
return null
}

override fun getFinalizations(): Collection<FinalizationData> {
return emptyList()
}

override fun addFinalization(finalization: FinalizationData, upstreamId: String) {
}

override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? {
return null
}
Expand Down
Loading

0 comments on commit effe881

Please sign in to comment.