Skip to content

Commit

Permalink
Pass all upstreams that resolved a request (#531)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Jul 25, 2024
1 parent 008b4c5 commit ee8181e
Show file tree
Hide file tree
Showing 27 changed files with 133 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ abstract class BaseHandler(
// If Proxy is configured to preserve original order it means that a client expect responses at exact same position
// as requests even if a request completely failed for a some reason. It's very unlikely situation, but still possible
// At this case, if we found a gap in responses, we put a default response with an error
?: NativeCall.CallResult(id, null, null, NativeCall.CallError(id, "No response", null, null), null, null, null)
?: NativeCall.CallResult(id, null, null, NativeCall.CallError(id, "No response", null, null), null, emptyList(), null)
}
}
.flatMapMany {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class WebsocketHandler(
}
Mono.just(response)
.map { Global.objectMapper.writeValueAsString(it) }
.doOnNext { eventHandler.onResponse(NativeCall.CallResult.ok(0, null, it.toByteArray(), null, null, null)) }
.doOnNext { eventHandler.onResponse(NativeCall.CallResult.ok(0, null, it.toByteArray(), null, emptyList(), null)) }
.doFinally { eventHandler.close() }
} else {
val eventHandler: AccessHandlerHttp.RequestHandler = eventHandlerFactory.call()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,20 @@ class QuorumRequestReader(
}
}

private fun resolvedBy() =
if (quorum.getResolvedBy().isEmpty()) null else quorum.getResolvedBy().last().getUpstreamSettingsData()
private fun resolvedBy(): List<Upstream.UpstreamSettingsData> {
if (quorum.getResolvedBy().isEmpty()) {
return emptyList()
}
return quorum.getResolvedBy().last().getUpstreamSettingsData()?.run { listOf(this) } ?: emptyList()
}

private fun noResponse(method: String, q: CallQuorum): Mono<Result> {
return apiControl.upstreamsMatchesResponse()?.run {
tracer.currentSpan()?.tag(SPAN_NO_RESPONSE_MESSAGE, getFullCause())
val cause = getCause(method) ?: return Mono.error(RpcException(1, "No response for method $method", getFullCause()))
if (cause.shouldReturnNull) {
Mono.just(
Result(Global.nullValue, null, 1, null, null),
Result(Global.nullValue, null, 1, emptyList(), null),
)
} else {
Mono.error(RpcException(1, "No response for method $method. Cause - ${cause.cause}"))
Expand Down
10 changes: 6 additions & 4 deletions src/main/kotlin/io/emeraldpay/dshackle/reader/BroadcastReader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,28 @@ class BroadcastReader(
val sig = getSignature(key, it.jsonRpcResponse, it.upstream.getId())
quorum.record(it.jsonRpcResponse, sig, it.upstream)
} else {
val err = ChainException(ChainResponse.NumberId(key.id), it.jsonRpcResponse.error!!, it.upstream.getUpstreamSettingsData())
val upData = it.upstream.getUpstreamSettingsData()?.run { listOf(this) } ?: emptyList()
val err = ChainException(ChainResponse.NumberId(key.id), it.jsonRpcResponse.error!!, upData)
quorum.record(err, null, it.upstream)
}
quorum
}.onErrorResume { err ->
log.error("Broadcast error: ${err.message}")
Mono.error(handleError(null, 0, null))
Mono.error(handleError(null, 0, emptyList()))
}.collectList()
.flatMap {
val upsData = quorum.getResolvedBy().mapNotNull { it.getUpstreamSettingsData() }
if (quorum.isResolved()) {
val res = Result(
quorum.getResponse()!!.getResult(),
quorum.getSignature(),
upstreams.size,
quorum.getResolvedBy().first().getUpstreamSettingsData(),
upsData,
null,
)
Mono.just(res)
} else {
Mono.error(handleError(quorum.getError(), key.id, null))
Mono.error(handleError(quorum.getError(), key.id, upsData))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ abstract class RequestReader(
)
}

protected fun handleError(error: ChainCallError?, id: Int, upstreamSettingsData: Upstream.UpstreamSettingsData?) =
error?.asException(ChainResponse.NumberId(id), upstreamSettingsData)
?: ChainException(ChainResponse.NumberId(id), ChainCallError(-32603, "Unhandled Upstream error"), upstreamSettingsData)
protected fun handleError(
error: ChainCallError?,
id: Int,
upstreamSettingsData: List<Upstream.UpstreamSettingsData>,
) = error?.asException(ChainResponse.NumberId(id), upstreamSettingsData)
?: ChainException(ChainResponse.NumberId(id), ChainCallError(-32603, "Unhandled Upstream error"), upstreamSettingsData)

protected fun getSignature(key: ChainRequest, response: ChainResponse, upstreamId: String) =
response.providedSignature
Expand All @@ -50,7 +53,7 @@ abstract class RequestReader(
val value: ByteArray,
val signature: ResponseSigner.Signature?,
val quorum: Int,
val resolvedUpstreamData: Upstream.UpstreamSettingsData?,
val resolvedUpstreamData: List<Upstream.UpstreamSettingsData>,
val stream: Flux<Chunk>?,
)
}
Expand Down
66 changes: 44 additions & 22 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,15 @@ open class NativeCall(
Flux.concat(
Mono.just(firstChunk)
.map {
buildStreamResult(it, callResult.id)
.setUpstreamId(callResult.upstreamSettingsData?.id)
.setUpstreamNodeVersion(callResult.upstreamSettingsData?.nodeVersion)
.build()
val result = buildStreamResult(it, callResult.id)
if (callResult.upstreamSettingsData.isNotEmpty()) {
getUpstreamIdsAndVersions(callResult.upstreamSettingsData)
.let { idsAndVersions ->
result.upstreamId = idsAndVersions.first
result.upstreamNodeVersion = idsAndVersions.second
}
}
result.build()
},
stream.skip(1).map { buildStreamResult(it, callResult.id).build() },
)
Expand Down Expand Up @@ -213,7 +218,7 @@ open class NativeCall(
val error = callContext.getError()

Mono.just(
CallResult(error.id, 0, null, error, null, null, null),
CallResult(error.id, 0, null, error, null, emptyList(), null),
)
}
}
Expand Down Expand Up @@ -242,9 +247,10 @@ open class NativeCall(
if (it.nonce != null && it.signature != null) {
result.signature = buildSignature(it.nonce, it.signature)
}
it.upstreamSettingsData?.let {
result.upstreamId = it.id
result.upstreamNodeVersion = it.nodeVersion
if (it.upstreamSettingsData.isNotEmpty()) {
val idsAndVersions = getUpstreamIdsAndVersions(it.upstreamSettingsData)
result.upstreamId = idsAndVersions.first
result.upstreamNodeVersion = idsAndVersions.second
}
it.finalization?.let {
result.finalization = Common.FinalizationData.newBuilder()
Expand Down Expand Up @@ -397,6 +403,13 @@ open class NativeCall(
}
}

private fun getUpstreamIdsAndVersions(upstreamSettingsData: List<Upstream.UpstreamSettingsData>): Pair<String, String> {
return Pair(
upstreamSettingsData.joinToString { it.id },
upstreamSettingsData.joinToString { it.nodeVersion },
)
}

private fun parsedCallDetails(item: BlockchainOuterClass.NativeCallItem): ParsedCallDetails {
return if (item.hasPayload()) {
ParsedCallDetails(item.method, extractParams(item.payload.toStringUtf8()))
Expand Down Expand Up @@ -432,10 +445,17 @@ open class NativeCall(
.read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, false))
.map {
val result = it.getResult()
val resolvedUpstreamData = it.resolvedUpstreamData ?: ctx.upstream.getUpstreamSettingsData()
val resolvedUpstreamData = it.resolvedUpstreamData.ifEmpty {
ctx.upstream.getUpstreamSettingsData()?.run { listOf(this) } ?: emptyList()
}
validateResult(result, "local", ctx)
if (ctx.nonce != null) {
CallResult.ok(ctx.id, ctx.nonce, result, signer.sign(ctx.nonce, result, resolvedUpstreamData?.id ?: ctx.upstream.getId()), resolvedUpstreamData, ctx, it.finalization)
val source = if (resolvedUpstreamData.isNotEmpty()) {
resolvedUpstreamData[0].id
} else {
ctx.upstream.getId()
}
CallResult.ok(ctx.id, ctx.nonce, result, signer.sign(ctx.nonce, result, source), resolvedUpstreamData, ctx, it.finalization)
} else {
CallResult.ok(ctx.id, null, result, null, resolvedUpstreamData, ctx, it.finalization)
}
Expand All @@ -446,8 +466,8 @@ open class NativeCall(
.onErrorResume {
Mono.just(CallResult.fail(ctx.id, ctx.nonce, it, ctx))
}.doOnNext {
if (it.finalization != null && it.upstreamSettingsData != null) {
ctx.upstream.addFinalization(it.finalization, it.upstreamSettingsData.id)
if (it.finalization != null && it.upstreamSettingsData.isNotEmpty()) {
ctx.upstream.addFinalization(it.finalization, it.upstreamSettingsData[0].id)
}
}
}
Expand All @@ -465,7 +485,9 @@ open class NativeCall(
return SpannedReader(reader, tracer, RPC_READER)
.read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, ctx.streamRequest))
.map {
val resolvedUpstreamData = it.resolvedUpstreamData ?: ctx.upstream.getUpstreamSettingsData()
val resolvedUpstreamData = it.resolvedUpstreamData.ifEmpty {
ctx.upstream.getUpstreamSettingsData()?.run { listOf(this) } ?: emptyList()
}
if (it.stream == null) {
val bytes = ctx.resultDecorator.processResult(it)
validateResult(bytes, "remote", ctx)
Expand Down Expand Up @@ -563,8 +585,8 @@ open class NativeCall(
}
override fun processResult(result: RequestReader.Result): ByteArray {
val bytes = result.value
if (bytes.last() == quoteCode && result.resolvedUpstreamData != null) {
val suffix = result.resolvedUpstreamData.nodeId
if (bytes.last() == quoteCode && result.resolvedUpstreamData.isNotEmpty()) {
val suffix = result.resolvedUpstreamData[0].nodeId
.toUByte()
.toString(16).padStart(2, padChar = '0').toByteArray()
bytes[bytes.lastIndex] = suffix.first()
Expand Down Expand Up @@ -678,7 +700,7 @@ open class NativeCall(
val message: String,
val upstreamError: ChainCallError?,
val data: String?,
val upstreamSettingsData: Upstream.UpstreamSettingsData? = null,
val upstreamSettingsData: List<Upstream.UpstreamSettingsData> = emptyList(),
) {

companion object {
Expand Down Expand Up @@ -719,7 +741,7 @@ open class NativeCall(
val result: ByteArray?,
val error: CallError?,
val signature: ResponseSigner.Signature?,
val upstreamSettingsData: Upstream.UpstreamSettingsData?,
val upstreamSettingsData: List<Upstream.UpstreamSettingsData>,
val ctx: ValidCallContext<ParsedCallDetails>?,
val stream: Flux<Chunk>? = null,
val finalization: FinalizationData? = null,
Expand All @@ -732,23 +754,23 @@ open class NativeCall(
callError: CallError?,
signature: ResponseSigner.Signature?,
ctx: ValidCallContext<ParsedCallDetails>?,
) : this(id, nonce, result, callError, signature, callError?.upstreamSettingsData, ctx)
) : this(id, nonce, result, callError, signature, callError?.upstreamSettingsData ?: emptyList(), ctx)

companion object {
fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: Upstream.UpstreamSettingsData?, ctx: ValidCallContext<ParsedCallDetails>?): CallResult {
fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: List<Upstream.UpstreamSettingsData>, ctx: ValidCallContext<ParsedCallDetails>?): CallResult {
return CallResult(id, nonce, result, null, signature, upstreamSettingsData, ctx)
}

fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: Upstream.UpstreamSettingsData?, ctx: ValidCallContext<ParsedCallDetails>?, final: FinalizationData?): CallResult {
fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: List<Upstream.UpstreamSettingsData>, ctx: ValidCallContext<ParsedCallDetails>?, final: FinalizationData?): CallResult {
return CallResult(id, nonce, result, null, signature, upstreamSettingsData, ctx, null, final)
}

fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: Upstream.UpstreamSettingsData?, ctx: ValidCallContext<ParsedCallDetails>?, stream: Flux<Chunk>?): CallResult {
fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: List<Upstream.UpstreamSettingsData>, ctx: ValidCallContext<ParsedCallDetails>?, stream: Flux<Chunk>?): CallResult {
return CallResult(id, nonce, result, null, signature, upstreamSettingsData, ctx, stream)
}

fun fail(id: Int, nonce: Long?, error: CallError, ctx: ValidCallContext<ParsedCallDetails>?): CallResult {
return CallResult(id, nonce, null, error, null, null, ctx)
return CallResult(id, nonce, null, error, null, emptyList(), ctx)
}

fun fail(id: Int, nonce: Long?, error: Throwable, ctx: ValidCallContext<ParsedCallDetails>?): CallResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ data class ChainCallError(val code: Int, val message: String, val details: Any?)
return ChainCallUpstreamException(id ?: ChainResponse.NumberId(-1), this)
}

fun asException(id: ChainResponse.Id?, upstreamSettingsData: Upstream.UpstreamSettingsData?): ChainException {
fun asException(id: ChainResponse.Id?, upstreamSettingsData: List<Upstream.UpstreamSettingsData>): ChainException {
return ChainException(id ?: ChainResponse.NumberId(-1), this, upstreamSettingsData, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ package io.emeraldpay.dshackle.upstream
class ChainCallUpstreamException(
id: ChainResponse.Id,
error: ChainCallError,
) : ChainException(id, error, null, false)
) : ChainException(id, error, emptyList(), false)
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
open class ChainException(
val id: ChainResponse.Id,
val error: ChainCallError,
val upstreamSettingsData: Upstream.UpstreamSettingsData? = null,
val upstreamSettingsData: List<Upstream.UpstreamSettingsData> = emptyList(),
writableStackTrace: Boolean = true,
cause: Throwable? = null,
) : Exception(error.message, cause, true, writableStackTrace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ class ChainResponse @JvmOverloads constructor(
* When making a request through Dshackle protocol a remote may provide its signature with the response, which we keep here
*/
val providedSignature: ResponseSigner.Signature? = null,
val resolvedUpstreamData: Upstream.UpstreamSettingsData? = null,
val resolvedUpstreamData: List<Upstream.UpstreamSettingsData> = emptyList(),
val finalization: FinalizationData? = null,
) {

constructor(stream: Flux<Chunk>, id: Int) :
this(null, null, NumberId(id.toLong()), stream, null, null, null)
this(null, null, NumberId(id.toLong()), stream, null, emptyList(), null)

constructor(result: ByteArray?, error: ChainCallError?) : this(result, error, NumberId(0), null, null)

constructor(result: ByteArray?, error: ChainCallError?, resolvedUpstreamData: Upstream.UpstreamSettingsData?) :
constructor(result: ByteArray?, error: ChainCallError?, resolvedUpstreamData: List<Upstream.UpstreamSettingsData>) :
this(result, error, NumberId(0), null, null, resolvedUpstreamData, null)
constructor(result: ByteArray?, resolvedUpstreamData: Upstream.UpstreamSettingsData?, finalization: FinalizationData) :
constructor(result: ByteArray?, resolvedUpstreamData: List<Upstream.UpstreamSettingsData>, finalization: FinalizationData) :
this(result, null, NumberId(0), null, null, resolvedUpstreamData, finalization)

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ open class EthereumCachingReader(
) : Reader<K, Result<D>> {
override fun read(key: K): Mono<Result<D>> {
return reader.read(key)
.map { Result(it, null) }
.map { Result(it, emptyList()) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,6 @@ class EthereumDirectReader(

data class Result<T>(
val data: T,
val resolvedUpstreamData: Upstream.UpstreamSettingsData?,
val resolvedUpstreamData: List<Upstream.UpstreamSettingsData>,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class EthereumLocalReader(
return commonRequests(key)?.switchIfEmpty {
// we need to explicitly return null to prevent executeOnRemote
// for example
Mono.just(ChainResponse(nullValue, null, null))
Mono.just(ChainResponse(nullValue, null, emptyList()))
} ?: Mono.empty()
}

Expand Down Expand Up @@ -244,7 +244,7 @@ class EthereumLocalReader(
}

return logsOracle.estimate(limit?.toLong(), fromBlock, toBlock, address, topics)
.map { ChainResponse(it.toByteArray(), null, null) }
.map { ChainResponse(it.toByteArray(), null, emptyList()) }
}

private fun parseBlockRef(blockRef: String?): Long {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ open class WsConnectionImpl(
RpcResponseError.CODE_INTERNAL_ERROR,
"Response not received from WebSocket",
),
null,
emptyList(),
false,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,16 @@ class JsonRpcGrpcClient(
} else {
null
}
Mono.just(ChainResponse(bytes, null, ChainResponse.NumberId(0), null, signature, Upstream.UpstreamSettingsData(0, resp.upstreamId, resp.upstreamNodeVersion)))
Mono.just(
ChainResponse(
bytes,
null,
ChainResponse.NumberId(0),
null,
signature,
listOf(Upstream.UpstreamSettingsData(0, resp.upstreamId, resp.upstreamNodeVersion)),
),
)
} else {
metrics?.fails?.increment()
Mono.error(
Expand Down
Loading

0 comments on commit ee8181e

Please sign in to comment.