Skip to content

Commit

Permalink
Chunk huge responses (#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Aug 30, 2023
1 parent 04acc7f commit b43d282
Show file tree
Hide file tree
Showing 11 changed files with 515 additions and 13 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ dependencies {
testImplementation libs.groovy
testImplementation libs.bundles.testcontainers
testImplementation libs.bundles.junit
testImplementation libs.mockito.kotlin

testImplementation(libs.spring.boot.starter.test) {
exclude module: 'spring-boot-starter-logging'
Expand Down
2 changes: 1 addition & 1 deletion emerald-grpc
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ micrometer-registry-prometheus = "io.micrometer:micrometer-registry-prometheus:1

mockserver-netty = "org.mock-server:mockserver-netty:5.11.2"

mockito-kotlin = "org.mockito.kotlin:mockito-kotlin:5.1.0"

netty-common = { module = "io.netty:netty-common", version.ref = "netty" }
netty-transport = { module = "io.netty:netty-transport", version.ref = "netty" }
netty-handler-core = { module = "io.netty:netty-handler", version.ref = "netty" }
Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import java.util.concurrent.TimeUnit
@Service
@DependsOn("monitoringSetup")
class BlockchainRpc(
private val nativeCall: NativeCall,
private val nativeCallStream: NativeCallStream,
private val nativeSubscribe: NativeSubscribe,
private val streamHead: StreamHead,
private val trackTx: List<TrackTx>,
Expand Down Expand Up @@ -78,7 +78,7 @@ class BlockchainRpc(
var startTime = 0L
var metrics: RequestMetrics? = null
val idsMap = mutableMapOf<Int, String>()
return nativeCall.nativeCall(
return nativeCallStream.nativeCall(
request
.subscribeOn(scheduler)
.doOnNext { req ->
Expand Down
69 changes: 69 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCallStream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.emeraldpay.dshackle.rpc

import com.google.protobuf.ByteString
import io.emeraldpay.api.proto.BlockchainOuterClass.NativeCallReplyItem
import io.emeraldpay.api.proto.BlockchainOuterClass.NativeCallRequest
import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import kotlin.math.min

@Service
class NativeCallStream(
private val nativeCall: NativeCall,
) {

fun nativeCall(
requestMono: Mono<NativeCallRequest>
): Flux<NativeCallReplyItem> {
return requestMono.flatMapMany { req ->
nativeCall.nativeCall(Mono.just(req))
.map { StreamNativeResult(it, req.chunkSize) }
.transform {
if (!req.sorted || req.itemsList.size == 1) {
it
} else {
it.sort { o1, o2 -> o1.response.id - o2.response.id }
}
}
}.concatMap {
val chunkSize = it.chunkSize
val response = it.response
if (chunkSize == 0 || response.payload.size() <= chunkSize || !response.succeed) {
Mono.just(response)
} else {
Flux.fromIterable(chunks(response, chunkSize))
}
}
}

private fun chunks(response: NativeCallReplyItem, chunkSize: Int): List<NativeCallReplyItem> {
val chunks = mutableListOf<ByteString>()
val responseBytes = response.payload

for (i in 0 until responseBytes.size() step+chunkSize) {
chunks.add(responseBytes.substring(i, min(i + chunkSize, responseBytes.size())))
}

return chunks
.mapIndexed { index, bytes ->
NativeCallReplyItem.newBuilder()
.apply {
id = response.id
payload = bytes
succeed = true
upstreamId = response.upstreamId
chunked = true
finalChunk = index == chunks.size - 1
if (this.finalChunk && response.hasSignature()) {
signature = response.signature
}
}.build()
}
}

private data class StreamNativeResult(
val response: NativeCallReplyItem,
val chunkSize: Int
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,6 @@ class SubscribeNodeStatus(

private fun buildStatus(status: UpstreamAvailability, height: Long?): NodeStatus.Builder =
NodeStatus.newBuilder()
.setAvailability(BlockchainOuterClass.AvailabilityEnum.forNumber(status.grpcId))
.setAvailability(Common.AvailabilityEnum.forNumber(status.grpcId))
.setCurrentHeight(height ?: 0)
}
4 changes: 2 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeStatus.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class SubscribeStatus(

fun chainUnavailable(chain: Chain): BlockchainOuterClass.ChainStatus {
return BlockchainOuterClass.ChainStatus.newBuilder()
.setAvailability(BlockchainOuterClass.AvailabilityEnum.AVAIL_UNAVAILABLE)
.setAvailability(Common.AvailabilityEnum.AVAIL_UNAVAILABLE)
.setChain(Common.ChainRef.forNumber(chain.id))
.setQuorum(0)
.build()
Expand All @@ -66,7 +66,7 @@ class SubscribeStatus(
0
}
return BlockchainOuterClass.ChainStatus.newBuilder()
.setAvailability(BlockchainOuterClass.AvailabilityEnum.forNumber(available.grpcId))
.setAvailability(Common.AvailabilityEnum.forNumber(available.grpcId))
.setChain(Common.ChainRef.forNumber(chain.id))
.setQuorum(quorum)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class SubscribeStatusSpec extends Specification {
then:
StepVerifier.create(act)
.expectNextMatches {
it.chainValue == Chain.ETHEREUM__MAINNET.id && it.availability == BlockchainOuterClass.AvailabilityEnum.AVAIL_OK
it.chainValue == Chain.ETHEREUM__MAINNET.id && it.availability == Common.AvailabilityEnum.AVAIL_OK
}
.expectComplete()
.verify(Duration.ofSeconds(3))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.api.proto.Common
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.config.UpstreamsConfig
Expand Down Expand Up @@ -260,17 +261,17 @@ class MultistreamSpec extends Specification {
ms.onUpstreamChange(
new UpstreamChangeEvent(Chain.ETHEREUM__MAINNET, up2, UpstreamChangeEvent.ChangeType.ADDED)
)
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_UNAVAILABLE))
up2.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK))
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK))
up1.onStatus(status(Common.AvailabilityEnum.AVAIL_UNAVAILABLE))
up2.onStatus(status(Common.AvailabilityEnum.AVAIL_OK))
up1.onStatus(status(Common.AvailabilityEnum.AVAIL_OK))
then:
assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2", "eth_test3")
when:
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_SYNCING))
up1.onStatus(status(Common.AvailabilityEnum.AVAIL_SYNCING))
then:
assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2")
when:
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK))
up1.onStatus(status(Common.AvailabilityEnum.AVAIL_OK))
then:
assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2", "eth_test3")
}
Expand Down Expand Up @@ -305,7 +306,7 @@ class MultistreamSpec extends Specification {
.verify(Duration.ofSeconds(3))
}

private BlockchainOuterClass.ChainStatus status(BlockchainOuterClass.AvailabilityEnum status) {
private BlockchainOuterClass.ChainStatus status(Common.AvailabilityEnum status) {
return BlockchainOuterClass.ChainStatus.newBuilder()
.setAvailability(status)
.build()
Expand Down
173 changes: 173 additions & 0 deletions src/test/kotlin/io/emeraldpay/dshackle/rpc/NativeCallStreamTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package io.emeraldpay.dshackle.rpc

import com.fasterxml.jackson.databind.JsonNode
import com.google.protobuf.ByteString
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.api.proto.BlockchainOuterClass.NativeCallRequest
import io.emeraldpay.dshackle.Global
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.mockito.kotlin.any
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import org.springframework.util.ResourceUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.test.StepVerifier
import java.time.Duration

class NativeCallStreamTest {
private val upstreamId = "upstreamId"
private val mapper = Global.objectMapper

@Test
fun `streaming response is equal to the original response`() {
val responseFile = ResourceUtils.getFile("classpath:responses/get-by-number-response.json")
val response = mapper.writeValueAsBytes(mapper.readValue(responseFile, JsonNode::class.java))
val nativeCallResponse = BlockchainOuterClass.NativeCallReplyItem.newBuilder()
.setId(1)
.setSucceed(true)
.setUpstreamId(upstreamId)
.setPayload(ByteString.copyFrom(response))
.build()
val nativeCallMock = mock<NativeCall> {
on { nativeCall(any()) } doReturn Flux.just(nativeCallResponse)
}
val nativeCallStream = NativeCallStream(nativeCallMock)
val req = Mono.just(
NativeCallRequest.newBuilder()
.setChunkSize(1000)
.build()
)

val result = nativeCallStream.nativeCall(req)
.collectList()
.block()!!
.map { it.payload.toByteArray() }
.reduce { acc, bytes -> acc.plus(bytes) }

assertTrue(response.contentEquals(result))
}

@Test
fun `streaming responses is correct`() {
val response = "\"0x1126938\"".toByteArray()
val nativeCallResponse = BlockchainOuterClass.NativeCallReplyItem.newBuilder()
.setId(15)
.setSucceed(true)
.setUpstreamId(upstreamId)
.setPayload(ByteString.copyFrom(response))
.build()
val nativeCallMock = mock<NativeCall> {
on { nativeCall(any()) } doReturn Flux.just(nativeCallResponse)
}
val nativeCallStream = NativeCallStream(nativeCallMock)
val req = Mono.just(
NativeCallRequest.newBuilder()
.setChunkSize(5)
.build()
)

val chunkResponse: (Int) -> BlockchainOuterClass.NativeCallReplyItem.Builder = { id ->
BlockchainOuterClass.NativeCallReplyItem.newBuilder()
.setId(id)
.setChunked(true)
.setSucceed(true)
.setUpstreamId(upstreamId)
}

val result = nativeCallStream.nativeCall(req)

StepVerifier.create(result)
.expectNext(
chunkResponse(15)
.setPayload(ByteString.copyFrom("\"0x11".toByteArray()))
.build()
)
.expectNext(
chunkResponse(15)
.setPayload(ByteString.copyFrom("26938".toByteArray()))
.build()
)
.expectNext(
chunkResponse(15)
.setFinalChunk(true)
.setPayload(ByteString.copyFrom("\"".toByteArray()))
.build()
)
.expectComplete()
.verify(Duration.ofSeconds(3))
}

@Test
fun `no streaming if response is too small`() {
val response = "\"0x1\"".toByteArray()
val nativeCallResponse = BlockchainOuterClass.NativeCallReplyItem.newBuilder()
.setId(15)
.setSucceed(true)
.setUpstreamId(upstreamId)
.setPayload(ByteString.copyFrom(response))
.build()
val nativeCallMock = mock<NativeCall> {
on { nativeCall(any()) } doReturn Flux.just(nativeCallResponse)
}
val nativeCallStream = NativeCallStream(nativeCallMock)
val req = Mono.just(
NativeCallRequest.newBuilder()
.setChunkSize(1000)
.build()
)

val result = nativeCallStream.nativeCall(req)

StepVerifier.create(result)
.expectNext(
nativeCallResponse
)
.expectComplete()
.verify(Duration.ofSeconds(3))
}

@Test
fun `sort responses by request id is correct`() {
val response = "\"0x1\"".toByteArray()
val response2 = "\"0x2\"".toByteArray()
val response3 = "\"0x3\"".toByteArray()

val nativeCallResponse: (Int, ByteArray) -> BlockchainOuterClass.NativeCallReplyItem = { id, resp ->
BlockchainOuterClass.NativeCallReplyItem.newBuilder()
.setId(id)
.setChunked(true)
.setSucceed(true)
.setUpstreamId(upstreamId)
.setPayload(ByteString.copyFrom(resp))
.build()
}
val nativeCallMock = mock<NativeCall> {
on { nativeCall(any()) } doReturn Flux.just(
nativeCallResponse(1, response), nativeCallResponse(2, response2), nativeCallResponse(3, response3)
).flatMap {
when (it.id) {
1 -> Mono.just(it).delayElement(Duration.ofMillis(200))
2 -> Mono.just(it).delayElement(Duration.ofMillis(100))
else -> Mono.just(it)
}
}
}
val nativeCallStream = NativeCallStream(nativeCallMock)
val req = Mono.just(
NativeCallRequest.newBuilder()
.setSorted(true)
.build()
)

val result = nativeCallStream.nativeCall(req)

StepVerifier.create(result)
.expectNextMatches { it.payload.toByteArray().contentEquals(response) }
.expectNextMatches { it.payload.toByteArray().contentEquals(response2) }
.expectNextMatches { it.payload.toByteArray().contentEquals(response3) }
.expectComplete()
.verify(Duration.ofSeconds(3))
}
}
Loading

0 comments on commit b43d282

Please sign in to comment.