From 387cd31c98244c59d5f5ea2db0c16b7432db51ac Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Tue, 28 May 2024 13:51:25 +0400 Subject: [PATCH] Fix streaming spam (#486) --- .../rpcclient/stream/JsonRpcStreamParser.kt | 105 +++++++++--------- .../dshackle/upstream/stream/Responses.kt | 15 --- .../stream/JsonRpcStreamParserTest.kt | 56 +++++++--- 3 files changed, 95 insertions(+), 81 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/stream/JsonRpcStreamParser.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/stream/JsonRpcStreamParser.kt index 157391e25..dd9f262f1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/stream/JsonRpcStreamParser.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/stream/JsonRpcStreamParser.kt @@ -37,10 +37,7 @@ class JsonRpcStreamParser( private const val QUOTE: Byte = '"'.code.toByte() } - fun streamParse( - statusCode: Int, - response: Flux, - ): Mono { + fun streamParse(statusCode: Int, response: Flux): Mono { val firstPartSize = AtomicInteger() return response.bufferUntil { if (firstPartSize.get() > firstChunkMaxSize) { @@ -110,10 +107,7 @@ class JsonRpcStreamParser( } } - private fun aggregateResponse( - response: Flux, - statusCode: Int, - ): Mono { + private fun aggregateResponse(response: Flux, statusCode: Int): Mono { return ByteBufFlux.fromInbound(response).aggregate().asByteArray() .map { AggregateResponse(it, statusCode) } } @@ -170,7 +164,8 @@ class JsonRpcStreamParser( var response: SingleResponse? = null try { jsonFactory.createParser(firstBytes).use { parser -> - while (parser.nextToken() != null) { + while (true) { + parser.nextToken() if (firstBytes.size == parser.currentLocation.byteOffset.toInt()) { break } @@ -178,56 +173,51 @@ class JsonRpcStreamParser( if (parser.currentName == "result") { val token = parser.nextToken() val tokenStart = parser.tokenLocation.byteOffset.toInt() - if (token.isScalarValue) { + response = if (token.isScalarValue) { val count = CountQuotesAndSlashes(AtomicInteger(1)) whatCount.set(count) - response = - SingleResponse( - processScalarValue(parser, tokenStart, firstBytes, count, endStream), - null, - ).merge(response) + SingleResponse( + processScalarValue(parser, tokenStart, firstBytes, count, endStream), + null, + ) } else { when (token) { JsonToken.START_OBJECT -> { - val count = - CountObjectBrackets( - AtomicInteger(1), - CountQuotesAndSlashes(AtomicInteger(0)), - ) + val count = CountObjectBrackets( + AtomicInteger(1), + CountQuotesAndSlashes(AtomicInteger(0)), + ) whatCount.set(count) - response = - SingleResponse( - processAndCountBrackets( - tokenStart, - firstBytes, - count, - endStream, - OBJECT_OPEN_BRACKET, - OBJECT_CLOSE_BRACKET, - ), - null, - ).merge(response) + SingleResponse( + processAndCountBrackets( + tokenStart, + firstBytes, + count, + endStream, + OBJECT_OPEN_BRACKET, + OBJECT_CLOSE_BRACKET, + ), + null, + ) } JsonToken.START_ARRAY -> { - val count = - CountArrayBrackets( - AtomicInteger(1), - CountQuotesAndSlashes(AtomicInteger(0)), - ) + val count = CountArrayBrackets( + AtomicInteger(1), + CountQuotesAndSlashes(AtomicInteger(0)), + ) whatCount.set(count) - response = - SingleResponse( - processAndCountBrackets( - tokenStart, - firstBytes, - count, - endStream, - ARRAY_OPEN_BRACKET, - ARRAY_CLOSE_BRACKET, - ), - null, - ).merge(response) + SingleResponse( + processAndCountBrackets( + tokenStart, + firstBytes, + count, + endStream, + ARRAY_OPEN_BRACKET, + ARRAY_CLOSE_BRACKET, + ), + null, + ) } else -> { @@ -235,15 +225,28 @@ class JsonRpcStreamParser( } } } + if (endStream.get()) { + // we parsed the whole result field, and we can go parse further + parser.skipChildren() + } else { + // otherwise return response assuming there is no error field + return response + } } else if (parser.currentName == "error") { - return SingleResponse(null, responseRpcParser.readError(parser)).merge(response) + return SingleResponse(response?.result, responseRpcParser.readError(parser)) } } } return response } } catch (e: Exception) { - log.warn("Streaming parsing exception: {}", e.message) + if (response == null) { + // something terrible happened when we even don't have a response that means we haven't parsed the first chunk + log.warn("Streaming parsing exception: {}", e.message) + } + // there may be other parsing exceptions that means we have parsed the first chunk, and we have a response + // but when we want to parse further we can get an error if the first chunk is not a finished json + // it doesn't matter, we have the result from the response and can return it return response } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/stream/Responses.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/stream/Responses.kt index c9903da90..1b2e7a4f0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/stream/Responses.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/stream/Responses.kt @@ -13,21 +13,6 @@ data class SingleResponse( fun noResponse() = result == null && error == null - fun merge(other: SingleResponse?): SingleResponse { - if (other == null) { - return this - } - var newResult: ByteArray? = result - var newError: ChainCallError? = error - if (newResult == null && other.result != null) { - newResult = other.result - } - if (newError == null && other.error != null) { - newError = other.error - } - return SingleResponse(newResult, newError) - } - override fun equals(other: Any?): Boolean { if (this === other) return true if (other !is SingleResponse) return false diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/stream/JsonRpcStreamParserTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/stream/JsonRpcStreamParserTest.kt index a5941c723..2d54750f2 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/stream/JsonRpcStreamParserTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/stream/JsonRpcStreamParserTest.kt @@ -12,6 +12,7 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.Arguments.of import org.junit.jupiter.params.provider.MethodSource import reactor.core.publisher.Flux import reactor.test.StepVerifier @@ -60,11 +61,11 @@ class JsonRpcStreamParserTest { @ParameterizedTest @MethodSource("data") fun `if first part has result field then single response`( - response: ByteArray, + response: List, result: ByteArray, ) { val statusCode = 200 - val stream: Flux = Flux.just(response) + val stream: Flux = Flux.fromIterable(response) StepVerifier.create(streamParser.streamParse(statusCode, stream)) .expectNext(SingleResponse(result, null)) @@ -116,24 +117,49 @@ class JsonRpcStreamParserTest { companion object { @JvmStatic fun data(): List = listOf( - Arguments.of("{\"id\": 2,\"result\": \"0x12\"}".toByteArray(), "\"0x12\"".toByteArray()), - Arguments.of("{\"id\": 2,\"result\": 11}".toByteArray(), "11".toByteArray()), - Arguments.of("{\"id\": 2,\"result\": false}".toByteArray(), "false".toByteArray()), - Arguments.of("{\"id\": 2,\"result\": null}".toByteArray(), "null".toByteArray()), - Arguments.of("{\"id\": 2,\"result\": {\"name\": \"value\"}".toByteArray(), "{\"name\": \"value\"}".toByteArray()), - Arguments.of("{\"id\": 2,\"result\": [{\"name\": \"value\"}]".toByteArray(), "[{\"name\": \"value\"}]".toByteArray()), + of( + listOf("{\"id\": 2,\"result\": \"0x12\"}".toByteArray()), + "\"0x12\"".toByteArray(), + ), + of( + listOf("{\"id\": 2,\"result\": 11}".toByteArray()), + "11".toByteArray(), + ), + of( + listOf("{\"id\": 2,\"result\": false}".toByteArray()), + "false".toByteArray(), + ), + of( + listOf("{\"id\": 2,\"result\": null}".toByteArray()), + "null".toByteArray(), + ), + of( + listOf("{\"id\": 2,\"result\": {\"name\": \"value\"}}".toByteArray()), + "{\"name\": \"value\"}".toByteArray(), + ), + of( + listOf("{\"id\": 2,\"result\": [{\"name\": \"value\"}]}".toByteArray()), + "[{\"name\": \"value\"}]".toByteArray(), + ), + of( + listOf( + "{\"id\": 2,\"result\": [{\"name\": \"value\"}], \"other".toByteArray(), + "\": \"newField\"}".toByteArray(), + ), + "[{\"name\": \"value\"}]".toByteArray(), + ), ) @JvmStatic fun dataStream(): List = listOf( - Arguments.of( + of( listOf("{\"id\": 2,\"result\": \"0x12".toByteArray(), "222\"}".toByteArray()), listOf( Chunk("\"0x12".toByteArray(), false), Chunk("222\"".toByteArray(), true), ), ), - Arguments.of( + of( listOf( "{\"id\": 2,\"result\": \"0x12".toByteArray(), "123\\\"".toByteArray(), @@ -145,7 +171,7 @@ class JsonRpcStreamParserTest { Chunk("222\"".toByteArray(), true), ), ), - Arguments.of( + of( listOf( "{\"id\": 2,\"result\": \"0x12".toByteArray(), "1\\n23\\\"".toByteArray(), @@ -159,7 +185,7 @@ class JsonRpcStreamParserTest { Chunk("\\222\\\\\\\\\"".toByteArray(), true), ), ), - Arguments.of( + of( listOf("{\"id\": 2,\"result\": {\"name\": ".toByteArray(), "\"bigName\"".toByteArray(), "}".toByteArray()), listOf( Chunk("{\"name\": ".toByteArray(), false), @@ -167,7 +193,7 @@ class JsonRpcStreamParserTest { Chunk("}".toByteArray(), true), ), ), - Arguments.of( + of( listOf( "{\"id\": 2,\"result\": [{\"name\": ".toByteArray(), "\"bigName\"".toByteArray(), @@ -180,7 +206,7 @@ class JsonRpcStreamParserTest { Chunk("}]".toByteArray(), true), ), ), - Arguments.of( + of( listOf( "{\"id\": 2,\"result\": [{\"na]me\": ".toByteArray(), "\"bigName]".toByteArray(), @@ -197,7 +223,7 @@ class JsonRpcStreamParserTest { Chunk("}]".toByteArray(), true), ), ), - Arguments.of( + of( listOf( "{\"id\": 2,\"result\": {\"name\": ".toByteArray(), "\"bigName}".toByteArray(),