Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass all upstreams that resolved a request #531

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ abstract class BaseHandler(
// If Proxy is configured to preserve original order it means that a client expect responses at exact same position
// as requests even if a request completely failed for a some reason. It's very unlikely situation, but still possible
// At this case, if we found a gap in responses, we put a default response with an error
?: NativeCall.CallResult(id, null, null, NativeCall.CallError(id, "No response", null, null), null, null, null)
?: NativeCall.CallResult(id, null, null, NativeCall.CallError(id, "No response", null, null), null, emptyList(), null)
}
}
.flatMapMany {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class WebsocketHandler(
}
Mono.just(response)
.map { Global.objectMapper.writeValueAsString(it) }
.doOnNext { eventHandler.onResponse(NativeCall.CallResult.ok(0, null, it.toByteArray(), null, null, null)) }
.doOnNext { eventHandler.onResponse(NativeCall.CallResult.ok(0, null, it.toByteArray(), null, emptyList(), null)) }
.doFinally { eventHandler.close() }
} else {
val eventHandler: AccessHandlerHttp.RequestHandler = eventHandlerFactory.call()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,20 @@ class QuorumRequestReader(
}
}

private fun resolvedBy() =
if (quorum.getResolvedBy().isEmpty()) null else quorum.getResolvedBy().last().getUpstreamSettingsData()
private fun resolvedBy(): List<Upstream.UpstreamSettingsData> {
if (quorum.getResolvedBy().isEmpty()) {
return emptyList()
}
return quorum.getResolvedBy().last().getUpstreamSettingsData()?.run { listOf(this) } ?: emptyList()
}

private fun noResponse(method: String, q: CallQuorum): Mono<Result> {
return apiControl.upstreamsMatchesResponse()?.run {
tracer.currentSpan()?.tag(SPAN_NO_RESPONSE_MESSAGE, getFullCause())
val cause = getCause(method) ?: return Mono.error(RpcException(1, "No response for method $method", getFullCause()))
if (cause.shouldReturnNull) {
Mono.just(
Result(Global.nullValue, null, 1, null, null),
Result(Global.nullValue, null, 1, emptyList(), null),
)
} else {
Mono.error(RpcException(1, "No response for method $method. Cause - ${cause.cause}"))
Expand Down
10 changes: 6 additions & 4 deletions src/main/kotlin/io/emeraldpay/dshackle/reader/BroadcastReader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,28 @@ class BroadcastReader(
val sig = getSignature(key, it.jsonRpcResponse, it.upstream.getId())
quorum.record(it.jsonRpcResponse, sig, it.upstream)
} else {
val err = ChainException(ChainResponse.NumberId(key.id), it.jsonRpcResponse.error!!, it.upstream.getUpstreamSettingsData())
val upData = it.upstream.getUpstreamSettingsData()?.run { listOf(this) } ?: emptyList()
val err = ChainException(ChainResponse.NumberId(key.id), it.jsonRpcResponse.error!!, upData)
quorum.record(err, null, it.upstream)
}
quorum
}.onErrorResume { err ->
log.error("Broadcast error: ${err.message}")
Mono.error(handleError(null, 0, null))
Mono.error(handleError(null, 0, emptyList()))
}.collectList()
.flatMap {
val upsData = quorum.getResolvedBy().mapNotNull { it.getUpstreamSettingsData() }
if (quorum.isResolved()) {
val res = Result(
quorum.getResponse()!!.getResult(),
quorum.getSignature(),
upstreams.size,
quorum.getResolvedBy().first().getUpstreamSettingsData(),
upsData,
null,
)
Mono.just(res)
} else {
Mono.error(handleError(quorum.getError(), key.id, null))
Mono.error(handleError(quorum.getError(), key.id, upsData))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ abstract class RequestReader(
)
}

protected fun handleError(error: ChainCallError?, id: Int, upstreamSettingsData: Upstream.UpstreamSettingsData?) =
error?.asException(ChainResponse.NumberId(id), upstreamSettingsData)
?: ChainException(ChainResponse.NumberId(id), ChainCallError(-32603, "Unhandled Upstream error"), upstreamSettingsData)
protected fun handleError(
error: ChainCallError?,
id: Int,
upstreamSettingsData: List<Upstream.UpstreamSettingsData>,
) = error?.asException(ChainResponse.NumberId(id), upstreamSettingsData)
?: ChainException(ChainResponse.NumberId(id), ChainCallError(-32603, "Unhandled Upstream error"), upstreamSettingsData)

protected fun getSignature(key: ChainRequest, response: ChainResponse, upstreamId: String) =
response.providedSignature
Expand All @@ -50,7 +53,7 @@ abstract class RequestReader(
val value: ByteArray,
val signature: ResponseSigner.Signature?,
val quorum: Int,
val resolvedUpstreamData: Upstream.UpstreamSettingsData?,
val resolvedUpstreamData: List<Upstream.UpstreamSettingsData>,
val stream: Flux<Chunk>?,
)
}
Expand Down
66 changes: 44 additions & 22 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,15 @@ open class NativeCall(
Flux.concat(
Mono.just(firstChunk)
.map {
buildStreamResult(it, callResult.id)
.setUpstreamId(callResult.upstreamSettingsData?.id)
.setUpstreamNodeVersion(callResult.upstreamSettingsData?.nodeVersion)
.build()
val result = buildStreamResult(it, callResult.id)
if (callResult.upstreamSettingsData.isNotEmpty()) {
getUpstreamIdsAndVersions(callResult.upstreamSettingsData)
.let { idsAndVersions ->
result.upstreamId = idsAndVersions.first
result.upstreamNodeVersion = idsAndVersions.second
}
}
result.build()
},
stream.skip(1).map { buildStreamResult(it, callResult.id).build() },
)
Expand Down Expand Up @@ -213,7 +218,7 @@ open class NativeCall(
val error = callContext.getError()

Mono.just(
CallResult(error.id, 0, null, error, null, null, null),
CallResult(error.id, 0, null, error, null, emptyList(), null),
)
}
}
Expand Down Expand Up @@ -242,9 +247,10 @@ open class NativeCall(
if (it.nonce != null && it.signature != null) {
result.signature = buildSignature(it.nonce, it.signature)
}
it.upstreamSettingsData?.let {
result.upstreamId = it.id
result.upstreamNodeVersion = it.nodeVersion
if (it.upstreamSettingsData.isNotEmpty()) {
val idsAndVersions = getUpstreamIdsAndVersions(it.upstreamSettingsData)
result.upstreamId = idsAndVersions.first
result.upstreamNodeVersion = idsAndVersions.second
}
it.finalization?.let {
result.finalization = Common.FinalizationData.newBuilder()
Expand Down Expand Up @@ -397,6 +403,13 @@ open class NativeCall(
}
}

private fun getUpstreamIdsAndVersions(upstreamSettingsData: List<Upstream.UpstreamSettingsData>): Pair<String, String> {
return Pair(
upstreamSettingsData.joinToString { it.id },
upstreamSettingsData.joinToString { it.nodeVersion },
)
}

private fun parsedCallDetails(item: BlockchainOuterClass.NativeCallItem): ParsedCallDetails {
return if (item.hasPayload()) {
ParsedCallDetails(item.method, extractParams(item.payload.toStringUtf8()))
Expand Down Expand Up @@ -432,10 +445,17 @@ open class NativeCall(
.read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, false))
.map {
val result = it.getResult()
val resolvedUpstreamData = it.resolvedUpstreamData ?: ctx.upstream.getUpstreamSettingsData()
val resolvedUpstreamData = it.resolvedUpstreamData.ifEmpty {
ctx.upstream.getUpstreamSettingsData()?.run { listOf(this) } ?: emptyList()
}
validateResult(result, "local", ctx)
if (ctx.nonce != null) {
CallResult.ok(ctx.id, ctx.nonce, result, signer.sign(ctx.nonce, result, resolvedUpstreamData?.id ?: ctx.upstream.getId()), resolvedUpstreamData, ctx, it.finalization)
val source = if (resolvedUpstreamData.isNotEmpty()) {
resolvedUpstreamData[0].id
} else {
ctx.upstream.getId()
}
CallResult.ok(ctx.id, ctx.nonce, result, signer.sign(ctx.nonce, result, source), resolvedUpstreamData, ctx, it.finalization)
} else {
CallResult.ok(ctx.id, null, result, null, resolvedUpstreamData, ctx, it.finalization)
}
Expand All @@ -446,8 +466,8 @@ open class NativeCall(
.onErrorResume {
Mono.just(CallResult.fail(ctx.id, ctx.nonce, it, ctx))
}.doOnNext {
if (it.finalization != null && it.upstreamSettingsData != null) {
ctx.upstream.addFinalization(it.finalization, it.upstreamSettingsData.id)
if (it.finalization != null && it.upstreamSettingsData.isNotEmpty()) {
ctx.upstream.addFinalization(it.finalization, it.upstreamSettingsData[0].id)
}
}
}
Expand All @@ -465,7 +485,9 @@ open class NativeCall(
return SpannedReader(reader, tracer, RPC_READER)
.read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, ctx.streamRequest))
.map {
val resolvedUpstreamData = it.resolvedUpstreamData ?: ctx.upstream.getUpstreamSettingsData()
val resolvedUpstreamData = it.resolvedUpstreamData.ifEmpty {
ctx.upstream.getUpstreamSettingsData()?.run { listOf(this) } ?: emptyList()
}
if (it.stream == null) {
val bytes = ctx.resultDecorator.processResult(it)
validateResult(bytes, "remote", ctx)
Expand Down Expand Up @@ -563,8 +585,8 @@ open class NativeCall(
}
override fun processResult(result: RequestReader.Result): ByteArray {
val bytes = result.value
if (bytes.last() == quoteCode && result.resolvedUpstreamData != null) {
val suffix = result.resolvedUpstreamData.nodeId
if (bytes.last() == quoteCode && result.resolvedUpstreamData.isNotEmpty()) {
val suffix = result.resolvedUpstreamData[0].nodeId
.toUByte()
.toString(16).padStart(2, padChar = '0').toByteArray()
bytes[bytes.lastIndex] = suffix.first()
Expand Down Expand Up @@ -678,7 +700,7 @@ open class NativeCall(
val message: String,
val upstreamError: ChainCallError?,
val data: String?,
val upstreamSettingsData: Upstream.UpstreamSettingsData? = null,
val upstreamSettingsData: List<Upstream.UpstreamSettingsData> = emptyList(),
) {

companion object {
Expand Down Expand Up @@ -719,7 +741,7 @@ open class NativeCall(
val result: ByteArray?,
val error: CallError?,
val signature: ResponseSigner.Signature?,
val upstreamSettingsData: Upstream.UpstreamSettingsData?,
val upstreamSettingsData: List<Upstream.UpstreamSettingsData>,
val ctx: ValidCallContext<ParsedCallDetails>?,
val stream: Flux<Chunk>? = null,
val finalization: FinalizationData? = null,
Expand All @@ -732,23 +754,23 @@ open class NativeCall(
callError: CallError?,
signature: ResponseSigner.Signature?,
ctx: ValidCallContext<ParsedCallDetails>?,
) : this(id, nonce, result, callError, signature, callError?.upstreamSettingsData, ctx)
) : this(id, nonce, result, callError, signature, callError?.upstreamSettingsData ?: emptyList(), ctx)

companion object {
fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: Upstream.UpstreamSettingsData?, ctx: ValidCallContext<ParsedCallDetails>?): CallResult {
fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: List<Upstream.UpstreamSettingsData>, ctx: ValidCallContext<ParsedCallDetails>?): CallResult {
return CallResult(id, nonce, result, null, signature, upstreamSettingsData, ctx)
}

fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: Upstream.UpstreamSettingsData?, ctx: ValidCallContext<ParsedCallDetails>?, final: FinalizationData?): CallResult {
fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: List<Upstream.UpstreamSettingsData>, ctx: ValidCallContext<ParsedCallDetails>?, final: FinalizationData?): CallResult {
return CallResult(id, nonce, result, null, signature, upstreamSettingsData, ctx, null, final)
}

fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: Upstream.UpstreamSettingsData?, ctx: ValidCallContext<ParsedCallDetails>?, stream: Flux<Chunk>?): CallResult {
fun ok(id: Int, nonce: Long?, result: ByteArray, signature: ResponseSigner.Signature?, upstreamSettingsData: List<Upstream.UpstreamSettingsData>, ctx: ValidCallContext<ParsedCallDetails>?, stream: Flux<Chunk>?): CallResult {
return CallResult(id, nonce, result, null, signature, upstreamSettingsData, ctx, stream)
}

fun fail(id: Int, nonce: Long?, error: CallError, ctx: ValidCallContext<ParsedCallDetails>?): CallResult {
return CallResult(id, nonce, null, error, null, null, ctx)
return CallResult(id, nonce, null, error, null, emptyList(), ctx)
}

fun fail(id: Int, nonce: Long?, error: Throwable, ctx: ValidCallContext<ParsedCallDetails>?): CallResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ data class ChainCallError(val code: Int, val message: String, val details: Any?)
return ChainCallUpstreamException(id ?: ChainResponse.NumberId(-1), this)
}

fun asException(id: ChainResponse.Id?, upstreamSettingsData: Upstream.UpstreamSettingsData?): ChainException {
fun asException(id: ChainResponse.Id?, upstreamSettingsData: List<Upstream.UpstreamSettingsData>): ChainException {
return ChainException(id ?: ChainResponse.NumberId(-1), this, upstreamSettingsData, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ package io.emeraldpay.dshackle.upstream
class ChainCallUpstreamException(
id: ChainResponse.Id,
error: ChainCallError,
) : ChainException(id, error, null, false)
) : ChainException(id, error, emptyList(), false)
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
open class ChainException(
val id: ChainResponse.Id,
val error: ChainCallError,
val upstreamSettingsData: Upstream.UpstreamSettingsData? = null,
val upstreamSettingsData: List<Upstream.UpstreamSettingsData> = emptyList(),
writableStackTrace: Boolean = true,
cause: Throwable? = null,
) : Exception(error.message, cause, true, writableStackTrace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ class ChainResponse @JvmOverloads constructor(
* When making a request through Dshackle protocol a remote may provide its signature with the response, which we keep here
*/
val providedSignature: ResponseSigner.Signature? = null,
val resolvedUpstreamData: Upstream.UpstreamSettingsData? = null,
val resolvedUpstreamData: List<Upstream.UpstreamSettingsData> = emptyList(),
val finalization: FinalizationData? = null,
) {

constructor(stream: Flux<Chunk>, id: Int) :
this(null, null, NumberId(id.toLong()), stream, null, null, null)
this(null, null, NumberId(id.toLong()), stream, null, emptyList(), null)

constructor(result: ByteArray?, error: ChainCallError?) : this(result, error, NumberId(0), null, null)

constructor(result: ByteArray?, error: ChainCallError?, resolvedUpstreamData: Upstream.UpstreamSettingsData?) :
constructor(result: ByteArray?, error: ChainCallError?, resolvedUpstreamData: List<Upstream.UpstreamSettingsData>) :
this(result, error, NumberId(0), null, null, resolvedUpstreamData, null)
constructor(result: ByteArray?, resolvedUpstreamData: Upstream.UpstreamSettingsData?, finalization: FinalizationData) :
constructor(result: ByteArray?, resolvedUpstreamData: List<Upstream.UpstreamSettingsData>, finalization: FinalizationData) :
this(result, null, NumberId(0), null, null, resolvedUpstreamData, finalization)

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ open class EthereumCachingReader(
) : Reader<K, Result<D>> {
override fun read(key: K): Mono<Result<D>> {
return reader.read(key)
.map { Result(it, null) }
.map { Result(it, emptyList()) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,6 @@ class EthereumDirectReader(

data class Result<T>(
val data: T,
val resolvedUpstreamData: Upstream.UpstreamSettingsData?,
val resolvedUpstreamData: List<Upstream.UpstreamSettingsData>,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class EthereumLocalReader(
return commonRequests(key)?.switchIfEmpty {
// we need to explicitly return null to prevent executeOnRemote
// for example
Mono.just(ChainResponse(nullValue, null, null))
Mono.just(ChainResponse(nullValue, null, emptyList()))
} ?: Mono.empty()
}

Expand Down Expand Up @@ -244,7 +244,7 @@ class EthereumLocalReader(
}

return logsOracle.estimate(limit?.toLong(), fromBlock, toBlock, address, topics)
.map { ChainResponse(it.toByteArray(), null, null) }
.map { ChainResponse(it.toByteArray(), null, emptyList()) }
}

private fun parseBlockRef(blockRef: String?): Long {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ open class WsConnectionImpl(
RpcResponseError.CODE_INTERNAL_ERROR,
"Response not received from WebSocket",
),
null,
emptyList(),
false,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,16 @@ class JsonRpcGrpcClient(
} else {
null
}
Mono.just(ChainResponse(bytes, null, ChainResponse.NumberId(0), null, signature, Upstream.UpstreamSettingsData(0, resp.upstreamId, resp.upstreamNodeVersion)))
Mono.just(
ChainResponse(
bytes,
null,
ChainResponse.NumberId(0),
null,
signature,
listOf(Upstream.UpstreamSettingsData(0, resp.upstreamId, resp.upstreamNodeVersion)),
),
)
} else {
metrics?.fails?.increment()
Mono.error(
Expand Down
Loading
Loading