diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfigReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfigReader.kt index f06055d36..617172758 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfigReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfigReader.kt @@ -252,8 +252,8 @@ class UpstreamsConfigReader( return false } return upstream.nodeId?.let { - if (it !in 1..255) { - log.warn("Invalid node-id: $it. Must be in range [1, 255].") + if (it !in 1..65535) { + log.warn("Invalid node-id: $it. Must be in range [1, 65535].") false } else if (!knownNodeIds.add(it)) { log.warn("Duplicated node-id: $it. Must be in unique.") diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt index df7e1c82c..d6380d009 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt @@ -587,13 +587,23 @@ open class NativeCall( val bytes = result.value if (bytes.last() == quoteCode && result.resolvedUpstreamData.isNotEmpty()) { val suffix = result.resolvedUpstreamData[0].nodeId - .toUByte() - .toString(16).padStart(2, padChar = '0').toByteArray() - bytes[bytes.lastIndex] = suffix.first() - return bytes + suffix.last() + quoteCode + .toUShort() + .toString(16).padStart(4, padChar = '0').toByteArray() + return resultArray(bytes, suffix) } return bytes } + + private fun resultArray(bytes: ByteArray, suffix: ByteArray): ByteArray { + val newBytes = ByteArray(bytes.size + suffix.size) + newBytes[newBytes.size - 1] = quoteCode + var index = bytes.size - 1 + + System.arraycopy(bytes, 0, newBytes, 0, bytes.size - 1) + for (byteVal in suffix) newBytes[index++] = byteVal + + return newBytes + } } interface RequestDecorator { @@ -608,7 +618,7 @@ open class NativeCall( override fun processRequest(request: CallParams): CallParams { if (request is ListParams) { val filterId = request.list.first().toString() - val sanitized = filterId.substring(0, filterId.lastIndex - 1) + val sanitized = filterId.substring(0, filterId.lastIndex - 3) return ListParams(listOf(sanitized)) } return request diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt index 945e438c5..06c62ff50 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt @@ -24,7 +24,7 @@ open class GenericUpstreamCreator( private val connectorFactoryCreatorResolver: ConnectorFactoryCreatorResolver, private val versionRules: Supplier, ) : UpstreamCreator(chainsConfig, indexConfig, callTargets) { - private val hashes: MutableMap = HashMap() + private val hashes = HashSet() override fun createUpstream( upstreamsConfig: UpstreamsConfig.Upstream<*>, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreator.kt index f4a88e567..ab3841bf4 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreator.kt @@ -22,26 +22,28 @@ abstract class UpstreamCreator( protected val log: Logger = LoggerFactory.getLogger(this::class.java) companion object { - fun getHash(nodeId: Int?, obj: Any, hashes: MutableMap): Byte = - nodeId?.toByte() ?: (obj.hashCode() % 255).let { - if (it == 0) 1 else it - }.let { nonZeroHash -> - listOf>( - Function { i -> i }, - Function { i -> (-i) }, - Function { i -> 127 - abs(i) }, - Function { i -> abs(i) - 128 }, - ).map { - it.apply(nonZeroHash).toByte() - }.firstOrNull { - hashes[it] != true - }?.let { - hashes[it] = true - it - } ?: (Byte.MIN_VALUE..Byte.MAX_VALUE).first { - it != 0 && hashes[it.toByte()] != true - }.toByte() - } + fun getHash(nodeId: Int?, obj: Any, hashes: MutableSet): Short { + val hash = nodeId?.toShort() + ?: run { + (obj.hashCode() % 65535) + .let { if (it == 0) 1 else it } + .let { nonZeroHash -> + listOf>( + Function { i -> i }, + Function { i -> (-i) }, + Function { i -> 32767 - abs(i) }, + Function { i -> abs(i) - 32768 }, + ) + .map { it.apply(nonZeroHash).toShort() } + .firstOrNull { !hashes.contains(it) } + } + ?: (Short.MIN_VALUE..Short.MAX_VALUE).first { + it != 0 && !hashes.contains(it.toShort()) + }.toShort() + } + + return hash.also { hashes.add(it) } + } } fun createUpstream( diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index f8f57a037..b5d6b896e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicReference abstract class DefaultUpstream( private val id: String, - private val hash: Byte, + private val hash: Short, defaultLag: Long?, defaultAvail: UpstreamAvailability, private val options: ChainOptions.Options, @@ -46,7 +46,7 @@ abstract class DefaultUpstream( constructor( id: String, - hash: Byte, + hash: Short, options: ChainOptions.Options, role: UpstreamsConfig.UpstreamRole, targets: CallMethods?, @@ -153,7 +153,7 @@ abstract class DefaultUpstream( sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.UPDATED) } - override fun nodeId(): Byte = hash + override fun nodeId(): Short = hash open fun getQuorumByLabel(): QuorumForLabels { return node?.let { QuorumForLabels(it.copy(labels = fromMap(it.labels))) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/MatchesResponse.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/MatchesResponse.kt index 4180ac72a..302534333 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/MatchesResponse.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/MatchesResponse.kt @@ -82,7 +82,7 @@ sealed class MatchesResponse { ) : MatchesResponse() data class SameNodeResponse( - val upstreamHash: Byte, + val upstreamHash: Short, ) : MatchesResponse() object AvailabilityResponse : MatchesResponse() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index e1a58f8bd..56208c2d9 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -389,7 +389,7 @@ abstract class Multistream( return false } - override fun nodeId(): Byte = 0 + override fun nodeId(): Short = 0 override fun updateLowerBound(lowerBound: Long, type: LowerBoundType) { // NOOP diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt index a69c157dd..fa47b83ba 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt @@ -612,7 +612,7 @@ class Selector { } } - class SameNodeMatcher(private val upstreamHash: Byte) : Matcher() { + class SameNodeMatcher(private val upstreamHash: Short) : Matcher() { override fun matchesWithCause(up: Upstream): MatchesResponse = if (up.nodeId() == upstreamHash) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt index 4e9e67fe6..396d41c34 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt @@ -60,10 +60,10 @@ interface Upstream : Lifecycle { fun cast(selfType: Class): T - fun nodeId(): Byte + fun nodeId(): Short data class UpstreamSettingsData( - val nodeId: Byte, + val nodeId: Short, val id: String, val nodeVersion: String, ) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinUpstream.kt index 7c7d682db..da3813a54 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinUpstream.kt @@ -33,7 +33,7 @@ abstract class BitcoinUpstream( node: QuorumForLabels.QuorumItem, val esploraClient: EsploraClient? = null, chainConfig: ChainsConfig.ChainConfig, -) : DefaultUpstream(id, 0.toByte(), options, role, callMethods, node, chainConfig, chain) { +) : DefaultUpstream(id, 0.toShort(), options, role, callMethods, node, chainConfig, chain) { constructor( id: String, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/EthereumCallSelector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/EthereumCallSelector.kt index 64a181daa..af89bb965 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/EthereumCallSelector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/EthereumCallSelector.kt @@ -97,11 +97,11 @@ class EthereumCallSelector( } val filterId = list[0].toString() if (filterId.length < 4) { - return Mono.just(Selector.SameNodeMatcher(0.toByte())) + return Mono.just(Selector.SameNodeMatcher(0.toShort())) } - val hashHex = filterId.substring(filterId.length - 2) + val hashHex = filterId.substring(filterId.length - 4) val nodeId = hashHex.toInt(16) - return Mono.just(Selector.SameNodeMatcher(nodeId.toByte())) + return Mono.just(Selector.SameNodeMatcher(nodeId.toShort())) } private fun blockTagSelector(params: String, pos: Int, paramName: String?, head: Head): Mono { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 4f5051cc6..a5f7f795c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -41,7 +41,7 @@ import java.util.function.Supplier open class GenericUpstream( id: String, chain: Chain, - hash: Byte, + hash: Short, options: ChainOptions.Options, role: UpstreamsConfig.UpstreamRole, targets: CallMethods?, @@ -58,7 +58,7 @@ open class GenericUpstream( constructor( config: UpstreamsConfig.Upstream<*>, chain: Chain, - hash: Byte, + hash: Short, options: ChainOptions.Options, node: QuorumForLabels.QuorumItem?, chainConfig: ChainsConfig.ChainConfig, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt index 9a620ed6f..715a70ede 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt @@ -56,7 +56,7 @@ import java.util.function.Function open class GenericGrpcUpstream( parentId: String, - hash: Byte, + hash: Short, role: UpstreamsConfig.UpstreamRole, chain: Chain, private val remote: ReactorBlockchainGrpc.ReactorBlockchainStub, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamCreator.kt index f0dff1ba5..744adef05 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamCreator.kt @@ -32,7 +32,7 @@ class GrpcUpstreamCreator( @Value("\${spring.application.max-metadata-size}") private var maxMetadataSize: Int = Defaults.maxMetadataSize - private val hashes: MutableMap = HashMap() + private val hashes = HashSet() companion object { val grpcUpstreamsScheduler: Scheduler = Schedulers.fromExecutorService( diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt index 447ee5c75..dbb532a0d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt @@ -69,7 +69,7 @@ import kotlin.concurrent.withLock class GrpcUpstreams( private val id: String, - private val hash: Byte, + private val hash: Short, private val role: UpstreamsConfig.UpstreamRole, private val host: String, private val port: Int, diff --git a/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeCallSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeCallSpec.groovy index 34dead260..99cc3add7 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeCallSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeCallSpec.groovy @@ -581,7 +581,7 @@ class NativeCallSpec extends Specification { setup: def nativeCall = nativeCall() def ctx = new NativeCall.ValidCallContext(1, null, Stub(Multistream), new Selector.UpstreamFilter(Selector.empty), new AlwaysQuorum(), - new NativeCall.ParsedCallDetails("eth_getFilterUpdates", new ListParams("0xabcd")), + new NativeCall.ParsedCallDetails("eth_getFilterUpdates", new ListParams("0xabcdcd")), new NativeCall.WithFilterIdDecorator(), new NativeCall.NoneResultDecorator(), null, false, "reqId", 1) when: def act = nativeCall.parseParams(ctx) @@ -609,7 +609,7 @@ class NativeCallSpec extends Specification { def nativeCall = nativeCall(multistreamHolder) nativeCall.requestReaderFactory = Mock(RequestReaderFactory) { 1 * create(_) >> Mock(RequestReader) { - 1 * read(_) >> Mono.just(new RequestReader.Result("\"0xab\"".bytes, null, 1, List.of(new Upstream.UpstreamSettingsData((byte) 255, "", "")), null)) + 1 * read(_) >> Mono.just(new RequestReader.Result("\"0xab\"".bytes, null, 1, List.of(new Upstream.UpstreamSettingsData((short) 65535, "", "")), null)) } } def call = new NativeCall.ValidCallContext(1, 10, multistream, new Selector.UpstreamFilter(Selector.empty), quorum, @@ -620,7 +620,7 @@ class NativeCallSpec extends Specification { def resp = nativeCall.executeOnRemote(call).block(Duration.ofSeconds(1)) def act = objectMapper.readValue(resp.result, Object) then: - act == "0xabff" + act == "0xabffff" resp.nonce == 10 } @@ -653,7 +653,7 @@ class NativeCallSpec extends Specification { def resp = nativeCall.executeOnRemote(call).block(Duration.ofSeconds(1)) def act = objectMapper.readValue(resp.result, Object) then: - act == "0xab01" + act == "0xab0001" resp.nonce == 10 } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/calls/EthereumCallSelectorSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/calls/EthereumCallSelectorSpec.groovy index fa9c943ac..d6ee0bb8e 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/calls/EthereumCallSelectorSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/calls/EthereumCallSelectorSpec.groovy @@ -194,12 +194,12 @@ class EthereumCallSelectorSpec extends Specification { expect: callSelector.getMatcher("eth_getFilterChanges", param, head, true).block() - == new Selector.SameNodeMatcher((byte)hash) + == new Selector.SameNodeMatcher((short)hash) where: param | hash - '["0xff09"]' | 9 - '["0xff"]' | 255 + '["0x0009"]' | 9 + '["0x00ff"]' | 255 '[""]' | 0 '["0x0"]' | 0 } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreatorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreatorTest.kt new file mode 100644 index 000000000..0806eb939 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/startup/configure/UpstreamCreatorTest.kt @@ -0,0 +1,42 @@ +package io.emeraldpay.dshackle.startup.configure + +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.MethodSource + +class UpstreamCreatorTest { + + @ParameterizedTest + @MethodSource("data") + fun `test getHash`( + inputHash: Int?, + obj: Any, + answer: Short, + ) { + val hash = UpstreamCreator.getHash(inputHash, obj, hashes) + + assertThat(hash).isEqualTo(answer) + + println(hashes) + } + + companion object { + private val hashes = HashSet() + + @JvmStatic + fun data(): List = + listOf( + Arguments.of(49, 1, 49.toShort()), + Arguments.of(4000, 1, 4000.toShort()), + Arguments.of(24000, 1, 24000.toShort()), + Arguments.of(null, 49, (-49).toShort()), + Arguments.of(null, 49, (32718).toShort()), + Arguments.of(null, 49, (-32719).toShort()), + Arguments.of(null, 49, (-32768).toShort()), + Arguments.of(null, 49, (-32767).toShort()), + Arguments.of(null, 32718, (-32718).toShort()), + Arguments.of(null, 32718, (-50).toShort()), + ) + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstreamTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstreamTest.kt index 2e97562de..5420e256e 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstreamTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstreamTest.kt @@ -22,7 +22,7 @@ import reactor.core.scheduler.Schedulers class GenericGrpcUpstreamTest { private val parentId = "testParent" - private val hash: Byte = 0x01 + private val hash: Short = 0x01 private val role = UpstreamsConfig.UpstreamRole.PRIMARY private val headSink = Sinks.many().multicast().directBestEffort() private val remote = diff --git a/src/test/resources/configs/upstreams-node-id.yaml b/src/test/resources/configs/upstreams-node-id.yaml index cfea4978f..f492e0b27 100644 --- a/src/test/resources/configs/upstreams-node-id.yaml +++ b/src/test/resources/configs/upstreams-node-id.yaml @@ -26,7 +26,7 @@ upstreams: ws: url: "ws://localhost:9546" - id: invalid_node_id - node-id: 256 + node-id: 100000 chain: ethereum connection: grpc: