Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track upstream finalization data #499

Merged
merged 9 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion emerald-grpc
23 changes: 20 additions & 3 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.emeraldpay.dshackle.rpc
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.protobuf.ByteString
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.api.proto.Common
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.Global.Companion.nullValue
Expand Down Expand Up @@ -47,6 +48,7 @@ import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcResponseError
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
import io.emeraldpay.dshackle.upstream.rpcclient.CallParams
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import io.emeraldpay.dshackle.upstream.rpcclient.ObjectParams
Expand Down Expand Up @@ -243,6 +245,12 @@ open class NativeCall(
result.upstreamId = it.id
result.upstreamNodeVersion = it.nodeVersion
}
it.finalization?.let {
result.finalization = Common.FinalizationData.newBuilder()
.setHeight(it.height)
.setType(it.type.toProtoFinalizationType())
.build()
}
return result.build()
}

Expand Down Expand Up @@ -426,16 +434,20 @@ open class NativeCall(
val resolvedUpstreamData = it.resolvedUpstreamData ?: ctx.upstream.getUpstreamSettingsData()
validateResult(result, "local", ctx)
if (ctx.nonce != null) {
CallResult.ok(ctx.id, ctx.nonce, result, signer.sign(ctx.nonce, result, resolvedUpstreamData?.id ?: ctx.upstream.getId()), resolvedUpstreamData, ctx)
CallResult.ok(ctx.id, ctx.nonce, result, signer.sign(ctx.nonce, result, resolvedUpstreamData?.id ?: ctx.upstream.getId()), resolvedUpstreamData, ctx, it.finalization)
} else {
CallResult.ok(ctx.id, null, result, null, resolvedUpstreamData, ctx)
CallResult.ok(ctx.id, null, result, null, resolvedUpstreamData, ctx, it.finalization)
}
}
}.switchIfEmpty(
Mono.just(ctx).flatMap(this::executeOnRemote),
)
.onErrorResume {
Mono.just(CallResult.fail(ctx.id, ctx.nonce, it, ctx))
}.doOnNext {
if (it.finalization != null && it.upstreamSettingsData != null) {
ctx.upstream.addFinalization(it.finalization, it.upstreamSettingsData.id)
}
}
}

Expand Down Expand Up @@ -700,7 +712,7 @@ open class NativeCall(
}
}

open class CallResult(
open class CallResult @JvmOverloads constructor(
val id: Int,
val nonce: Long?,
val result: ByteArray?,
Expand All @@ -709,6 +721,7 @@ open class NativeCall(
val upstreamSettingsData: Upstream.UpstreamSettingsData?,
val ctx: ValidCallContext<ParsedCallDetails>?,
val stream: Flux<Chunk>? = null,
val finalization: FinalizationData? = null,
) {

constructor(
Expand All @@ -725,6 +738,10 @@ open class NativeCall(
return CallResult(id, nonce, result, null, signature, upstreamSettingsData, ctx)
}

fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: Upstream.UpstreamSettingsData?, ctx: ValidCallContext<ParsedCallDetails>?, final: FinalizationData?): CallResult {
return CallResult(id, nonce, result, null, signature, upstreamSettingsData, ctx, null, final)
}

fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: Upstream.UpstreamSettingsData?, ctx: ValidCallContext<ParsedCallDetails>?, stream: Flux<Chunk>?): CallResult {
return CallResult(id, nonce, result, null, signature, upstreamSettingsData, ctx, stream)
}
Expand Down
22 changes: 15 additions & 7 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import reactor.core.publisher.Mono
class StreamHead(
@Autowired private val multistreamHolder: MultistreamHolder,
) {

private val log = LoggerFactory.getLogger(StreamHead::class.java)

fun add(requestMono: Mono<Common.Chain>): Flux<BlockchainOuterClass.ChainHead> {
Expand All @@ -54,12 +53,20 @@ class StreamHead(

fun asProto(ms: Multistream, chain: Chain, block: BlockContainer): BlockchainOuterClass.ChainHead {
val msLowerBounds = ms.getLowerBounds()
val lowerBoundsProto = msLowerBounds
.map {
BlockchainOuterClass.LowerBound.newBuilder()
.setLowerBoundTimestamp(it.timestamp)
.setLowerBoundType(toProtoLowerBoundType(it.type))
.setLowerBoundValue(it.lowerBound)
val lowerBoundsProto =
msLowerBounds
.map {
BlockchainOuterClass.LowerBound.newBuilder()
.setLowerBoundTimestamp(it.timestamp)
.setLowerBoundType(toProtoLowerBoundType(it.type))
.setLowerBoundValue(it.lowerBound)
.build()
}
val finalizationData =
ms.getFinalizations().map {
Common.FinalizationData.newBuilder()
.setHeight(it.height)
.setType(it.type.toProtoFinalizationType())
.build()
}
val toOldApi = toOldApi(msLowerBounds)
Expand All @@ -72,6 +79,7 @@ class StreamHead(
.setCurrentLowerSlot(toOldApi.slot)
.setCurrentLowerDataTimestamp(toOldApi.timestamp)
.addAllLowerBounds(lowerBoundsProto)
.addAllFinalizationData(finalizationData)
.setTimestamp(block.timestamp.toEpochMilli())
.setWeight(ByteString.copyFrom(block.difficulty.toByteArray()))
.setBlockId(block.hash.toHex())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ open class GenericUpstreamCreator(
cs::upstreamRpcModulesDetector,
buildMethodsFun,
cs::lowerBoundService,
cs::finalizationDetectorBuilder,
)

upstream.start()
Expand Down
12 changes: 8 additions & 4 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainResponse.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package io.emeraldpay.dshackle.upstream
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.SerializerProvider
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
import io.emeraldpay.dshackle.upstream.signature.ResponseSigner
import io.emeraldpay.dshackle.upstream.stream.Chunk
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

class ChainResponse(
class ChainResponse @JvmOverloads constructor(
private val result: ByteArray?,
val error: ChainCallError?,
val id: Id,
Expand All @@ -33,15 +34,18 @@ class ChainResponse(
*/
val providedSignature: ResponseSigner.Signature? = null,
val resolvedUpstreamData: Upstream.UpstreamSettingsData? = null,
val finalization: FinalizationData? = null,
) {

constructor(stream: Flux<Chunk>, id: Int) :
this(null, null, NumberId(id.toLong()), stream, null, null)
this(null, null, NumberId(id.toLong()), stream, null, null, null)

constructor(result: ByteArray?, error: ChainCallError?) : this(result, error, NumberId(0), null)
constructor(result: ByteArray?, error: ChainCallError?) : this(result, error, NumberId(0), null, null)

constructor(result: ByteArray?, error: ChainCallError?, resolvedUpstreamData: Upstream.UpstreamSettingsData?) :
this(result, error, NumberId(0), null, null, resolvedUpstreamData)
this(result, error, NumberId(0), null, null, resolvedUpstreamData, null)
constructor(result: ByteArray?, resolvedUpstreamData: Upstream.UpstreamSettingsData?, finalization: FinalizationData) :
this(result, null, NumberId(0), null, null, resolvedUpstreamData, finalization)

companion object {
private val NULL_VALUE = "null".toByteArray()
Expand Down
15 changes: 15 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
import io.emeraldpay.dshackle.upstream.calls.AggregatedCallMethods
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.calls.CallSelector
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
import io.emeraldpay.dshackle.upstream.finalization.FinalizationType
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.micrometer.core.instrument.Gauge
Expand All @@ -48,6 +50,7 @@ import reactor.core.publisher.Sinks
import reactor.core.scheduler.Scheduler
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import kotlin.math.max

/**
* Aggregation of multiple upstreams responding to a single blockchain
Expand Down Expand Up @@ -355,6 +358,18 @@ abstract class Multistream(
started = true
}

override fun getFinalizations(): Collection<FinalizationData> {
return getAll().flatMap { it.getFinalizations() }
.fold(mutableMapOf<FinalizationType, Long>()) { acc, data ->
acc[data.type] = max(acc[data.type] ?: 0, data.height)
acc
}.toList().map { FinalizationData(it.second, it.first) }
}

override fun addFinalization(finalization: FinalizationData, upstreamId: String) {
getAll().find { it.getId() == upstreamId }?.addFinalization(finalization, upstreamId)
}

override fun getLowerBounds(): Collection<LowerBoundData> {
return lowerBounds.values
}
Expand Down
72 changes: 61 additions & 11 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.emeraldpay.dshackle.upstream.MatchesResponse.NotMatchedResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.SameNodeResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.SlotHeightResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.Success
import io.emeraldpay.dshackle.upstream.finalization.FinalizationType
import io.emeraldpay.dshackle.upstream.lowerbound.fromProtoType
import org.apache.commons.lang3.StringUtils
import java.util.Collections
Expand All @@ -41,6 +42,51 @@ class Selector {
@JvmStatic
val anyLabel = AnyLabelMatcher()

sealed class HeightNumberOrTag {

companion object {
fun fromHeightSelector(selector: BlockchainOuterClass.HeightSelector): HeightNumberOrTag? {
return when (selector.heightOrNumberCase) {
BlockchainOuterClass.HeightSelector.HeightOrNumberCase.HEIGHTORNUMBER_NOT_SET ->
return if (selector.height == -1L) {
Latest
} else {
Number(selector.height)
}
BlockchainOuterClass.HeightSelector.HeightOrNumberCase.NUMBER -> Number(selector.number)
BlockchainOuterClass.HeightSelector.HeightOrNumberCase.TAG -> when (selector.tag) {
BlockchainOuterClass.BlockTag.SAFE -> Safe
BlockchainOuterClass.BlockTag.LATEST -> Latest
BlockchainOuterClass.BlockTag.PENDING -> Pending
BlockchainOuterClass.BlockTag.FINALIZED -> Finalized
else -> null
}
else -> null
}
}
}
class Number(val num: Long) : HeightNumberOrTag()
object Pending : HeightNumberOrTag()
object Latest : HeightNumberOrTag()
object Safe : HeightNumberOrTag()
object Finalized : HeightNumberOrTag()

fun getSort(): Sort {
return when (this) {
is Latest -> Sort(
compareByDescending {
it.getHead().getCurrentHeight()
},
)

is Safe -> Sort.safe
is Finalized -> Sort.finalized

else -> Sort.default
}
}
}

@JvmStatic
fun convertToUpstreamFilter(selectors: List<BlockchainOuterClass.Selector>): UpstreamFilter {
val matcher = selectors
Expand All @@ -50,15 +96,9 @@ class Selector {
SlotMatcher(it.slotHeightSelector.slotHeight)
}
it.hasHeightSelector() -> {
val height = if (it.heightSelector.height == -1L) {
null
} else {
it.heightSelector.height
}
if (height == null) {
empty
} else {
HeightMatcher(height)
when (val selector = HeightNumberOrTag.fromHeightSelector(it.heightSelector)) {
is HeightNumberOrTag.Number -> HeightMatcher(selector.num)
else -> empty
}
}
else -> empty
Expand All @@ -71,8 +111,8 @@ class Selector {

private fun getSort(selectors: List<BlockchainOuterClass.Selector>): Sort {
selectors.forEach { selector ->
if (selector.hasHeightSelector() && selector.heightSelector.height == -1L) {
return Sort(compareByDescending { it.getHead().getCurrentHeight() })
if (selector.hasHeightSelector()) {
return HeightNumberOrTag.fromHeightSelector(selector.heightSelector)?.getSort() ?: Sort.default
} else if (selector.hasLowerHeightSelector()) {
return Sort(
compareBy(nullsLast()) {
Expand Down Expand Up @@ -180,6 +220,16 @@ class Selector {
companion object {
@JvmStatic
val default = Sort(compareBy { null })
val safe = Sort(
compareByDescending { up ->
up.getFinalizations().find { it.type == FinalizationType.SAFE_BLOCK }?.height ?: 0L
},
)
val finalized = Sort(
compareByDescending { up ->
up.getFinalizations().find { it.type == FinalizationType.FINALIZED_BLOCK }?.height ?: 0L
},
)
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -49,6 +50,8 @@ interface Upstream : Lifecycle {
fun isGrpc(): Boolean
fun getLowerBounds(): Collection<LowerBoundData>
fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData?
fun getFinalizations(): Collection<FinalizationData>
fun addFinalization(finalization: FinalizationData, upstreamId: String)
fun getUpstreamSettingsData(): UpstreamSettingsData?
fun updateLowerBound(lowerBound: Long, type: LowerBoundType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import reactor.core.Disposable
Expand Down Expand Up @@ -81,6 +82,13 @@ open class BitcoinRpcUpstream(
return null
}

override fun getFinalizations(): Collection<FinalizationData> {
return emptyList()
}

override fun addFinalization(finalization: FinalizationData, upstreamId: String) {
}

override fun getUpstreamSettingsData(): Upstream.UpstreamSettingsData? {
return null
}
Expand Down
Loading
Loading