Skip to content

Commit

Permalink
Short nodeId (#541)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Aug 6, 2024
1 parent 642e385 commit 0f143c9
Show file tree
Hide file tree
Showing 20 changed files with 108 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ class UpstreamsConfigReader(
return false
}
return upstream.nodeId?.let {
if (it !in 1..255) {
log.warn("Invalid node-id: $it. Must be in range [1, 255].")
if (it !in 1..65535) {
log.warn("Invalid node-id: $it. Must be in range [1, 65535].")
false
} else if (!knownNodeIds.add(it)) {
log.warn("Duplicated node-id: $it. Must be in unique.")
Expand Down
20 changes: 15 additions & 5 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -587,13 +587,23 @@ open class NativeCall(
val bytes = result.value
if (bytes.last() == quoteCode && result.resolvedUpstreamData.isNotEmpty()) {
val suffix = result.resolvedUpstreamData[0].nodeId
.toUByte()
.toString(16).padStart(2, padChar = '0').toByteArray()
bytes[bytes.lastIndex] = suffix.first()
return bytes + suffix.last() + quoteCode
.toUShort()
.toString(16).padStart(4, padChar = '0').toByteArray()
return resultArray(bytes, suffix)
}
return bytes
}

private fun resultArray(bytes: ByteArray, suffix: ByteArray): ByteArray {
val newBytes = ByteArray(bytes.size + suffix.size)
newBytes[newBytes.size - 1] = quoteCode
var index = bytes.size - 1

System.arraycopy(bytes, 0, newBytes, 0, bytes.size - 1)
for (byteVal in suffix) newBytes[index++] = byteVal

return newBytes
}
}

interface RequestDecorator {
Expand All @@ -608,7 +618,7 @@ open class NativeCall(
override fun processRequest(request: CallParams): CallParams {
if (request is ListParams) {
val filterId = request.list.first().toString()
val sanitized = filterId.substring(0, filterId.lastIndex - 1)
val sanitized = filterId.substring(0, filterId.lastIndex - 3)
return ListParams(listOf(sanitized))
}
return request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ open class GenericUpstreamCreator(
private val connectorFactoryCreatorResolver: ConnectorFactoryCreatorResolver,
private val versionRules: Supplier<CompatibleVersionsRules?>,
) : UpstreamCreator(chainsConfig, indexConfig, callTargets) {
private val hashes: MutableMap<Byte, Boolean> = HashMap()
private val hashes = HashSet<Short>()

override fun createUpstream(
upstreamsConfig: UpstreamsConfig.Upstream<*>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,28 @@ abstract class UpstreamCreator(
protected val log: Logger = LoggerFactory.getLogger(this::class.java)

companion object {
fun getHash(nodeId: Int?, obj: Any, hashes: MutableMap<Byte, Boolean>): Byte =
nodeId?.toByte() ?: (obj.hashCode() % 255).let {
if (it == 0) 1 else it
}.let { nonZeroHash ->
listOf<Function<Int, Int>>(
Function { i -> i },
Function { i -> (-i) },
Function { i -> 127 - abs(i) },
Function { i -> abs(i) - 128 },
).map {
it.apply(nonZeroHash).toByte()
}.firstOrNull {
hashes[it] != true
}?.let {
hashes[it] = true
it
} ?: (Byte.MIN_VALUE..Byte.MAX_VALUE).first {
it != 0 && hashes[it.toByte()] != true
}.toByte()
}
fun getHash(nodeId: Int?, obj: Any, hashes: MutableSet<Short>): Short {
val hash = nodeId?.toShort()
?: run {
(obj.hashCode() % 65535)
.let { if (it == 0) 1 else it }
.let { nonZeroHash ->
listOf<Function<Int, Int>>(
Function { i -> i },
Function { i -> (-i) },
Function { i -> 32767 - abs(i) },
Function { i -> abs(i) - 32768 },
)
.map { it.apply(nonZeroHash).toShort() }
.firstOrNull { !hashes.contains(it) }
}
?: (Short.MIN_VALUE..Short.MAX_VALUE).first {
it != 0 && !hashes.contains(it.toShort())
}.toShort()
}

return hash.also { hashes.add(it) }
}
}

fun createUpstream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicReference

abstract class DefaultUpstream(
private val id: String,
private val hash: Byte,
private val hash: Short,
defaultLag: Long?,
defaultAvail: UpstreamAvailability,
private val options: ChainOptions.Options,
Expand All @@ -46,7 +46,7 @@ abstract class DefaultUpstream(

constructor(
id: String,
hash: Byte,
hash: Short,
options: ChainOptions.Options,
role: UpstreamsConfig.UpstreamRole,
targets: CallMethods?,
Expand Down Expand Up @@ -153,7 +153,7 @@ abstract class DefaultUpstream(
sendUpstreamStateEvent(UpstreamChangeEvent.ChangeType.UPDATED)
}

override fun nodeId(): Byte = hash
override fun nodeId(): Short = hash

open fun getQuorumByLabel(): QuorumForLabels {
return node?.let { QuorumForLabels(it.copy(labels = fromMap(it.labels))) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ sealed class MatchesResponse {
) : MatchesResponse()

data class SameNodeResponse(
val upstreamHash: Byte,
val upstreamHash: Short,
) : MatchesResponse()

object AvailabilityResponse : MatchesResponse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ abstract class Multistream(
return false
}

override fun nodeId(): Byte = 0
override fun nodeId(): Short = 0

override fun updateLowerBound(lowerBound: Long, type: LowerBoundType) {
// NOOP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ class Selector {
}
}

class SameNodeMatcher(private val upstreamHash: Byte) : Matcher() {
class SameNodeMatcher(private val upstreamHash: Short) : Matcher() {

override fun matchesWithCause(up: Upstream): MatchesResponse =
if (up.nodeId() == upstreamHash) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ interface Upstream : Lifecycle {

fun <T : Upstream> cast(selfType: Class<T>): T

fun nodeId(): Byte
fun nodeId(): Short

data class UpstreamSettingsData(
val nodeId: Byte,
val nodeId: Short,
val id: String,
val nodeVersion: String,
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class BitcoinUpstream(
node: QuorumForLabels.QuorumItem,
val esploraClient: EsploraClient? = null,
chainConfig: ChainsConfig.ChainConfig,
) : DefaultUpstream(id, 0.toByte(), options, role, callMethods, node, chainConfig, chain) {
) : DefaultUpstream(id, 0.toShort(), options, role, callMethods, node, chainConfig, chain) {

constructor(
id: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ class EthereumCallSelector(
}
val filterId = list[0].toString()
if (filterId.length < 4) {
return Mono.just(Selector.SameNodeMatcher(0.toByte()))
return Mono.just(Selector.SameNodeMatcher(0.toShort()))
}
val hashHex = filterId.substring(filterId.length - 2)
val hashHex = filterId.substring(filterId.length - 4)
val nodeId = hashHex.toInt(16)
return Mono.just(Selector.SameNodeMatcher(nodeId.toByte()))
return Mono.just(Selector.SameNodeMatcher(nodeId.toShort()))
}

private fun blockTagSelector(params: String, pos: Int, paramName: String?, head: Head): Mono<Selector.Matcher> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import java.util.function.Supplier
open class GenericUpstream(
id: String,
chain: Chain,
hash: Byte,
hash: Short,
options: ChainOptions.Options,
role: UpstreamsConfig.UpstreamRole,
targets: CallMethods?,
Expand All @@ -58,7 +58,7 @@ open class GenericUpstream(
constructor(
config: UpstreamsConfig.Upstream<*>,
chain: Chain,
hash: Byte,
hash: Short,
options: ChainOptions.Options,
node: QuorumForLabels.QuorumItem?,
chainConfig: ChainsConfig.ChainConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import java.util.function.Function

open class GenericGrpcUpstream(
parentId: String,
hash: Byte,
hash: Short,
role: UpstreamsConfig.UpstreamRole,
chain: Chain,
private val remote: ReactorBlockchainGrpc.ReactorBlockchainStub,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class GrpcUpstreamCreator(
@Value("\${spring.application.max-metadata-size}")
private var maxMetadataSize: Int = Defaults.maxMetadataSize

private val hashes: MutableMap<Byte, Boolean> = HashMap()
private val hashes = HashSet<Short>()

companion object {
val grpcUpstreamsScheduler: Scheduler = Schedulers.fromExecutorService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import kotlin.concurrent.withLock

class GrpcUpstreams(
private val id: String,
private val hash: Byte,
private val hash: Short,
private val role: UpstreamsConfig.UpstreamRole,
private val host: String,
private val port: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ class NativeCallSpec extends Specification {
setup:
def nativeCall = nativeCall()
def ctx = new NativeCall.ValidCallContext(1, null, Stub(Multistream), new Selector.UpstreamFilter(Selector.empty), new AlwaysQuorum(),
new NativeCall.ParsedCallDetails("eth_getFilterUpdates", new ListParams("0xabcd")),
new NativeCall.ParsedCallDetails("eth_getFilterUpdates", new ListParams("0xabcdcd")),
new NativeCall.WithFilterIdDecorator(), new NativeCall.NoneResultDecorator(), null, false, "reqId", 1)
when:
def act = nativeCall.parseParams(ctx)
Expand Down Expand Up @@ -609,7 +609,7 @@ class NativeCallSpec extends Specification {
def nativeCall = nativeCall(multistreamHolder)
nativeCall.requestReaderFactory = Mock(RequestReaderFactory) {
1 * create(_) >> Mock(RequestReader) {
1 * read(_) >> Mono.just(new RequestReader.Result("\"0xab\"".bytes, null, 1, List.of(new Upstream.UpstreamSettingsData((byte) 255, "", "")), null))
1 * read(_) >> Mono.just(new RequestReader.Result("\"0xab\"".bytes, null, 1, List.of(new Upstream.UpstreamSettingsData((short) 65535, "", "")), null))
}
}
def call = new NativeCall.ValidCallContext(1, 10, multistream, new Selector.UpstreamFilter(Selector.empty), quorum,
Expand All @@ -620,7 +620,7 @@ class NativeCallSpec extends Specification {
def resp = nativeCall.executeOnRemote(call).block(Duration.ofSeconds(1))
def act = objectMapper.readValue(resp.result, Object)
then:
act == "0xabff"
act == "0xabffff"
resp.nonce == 10
}

Expand Down Expand Up @@ -653,7 +653,7 @@ class NativeCallSpec extends Specification {
def resp = nativeCall.executeOnRemote(call).block(Duration.ofSeconds(1))
def act = objectMapper.readValue(resp.result, Object)
then:
act == "0xab01"
act == "0xab0001"
resp.nonce == 10
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ class EthereumCallSelectorSpec extends Specification {

expect:
callSelector.getMatcher("eth_getFilterChanges", param, head, true).block()
== new Selector.SameNodeMatcher((byte)hash)
== new Selector.SameNodeMatcher((short)hash)

where:
param | hash
'["0xff09"]' | 9
'["0xff"]' | 255
'["0x0009"]' | 9
'["0x00ff"]' | 255
'[""]' | 0
'["0x0"]' | 0
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.emeraldpay.dshackle.startup.configure

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource

class UpstreamCreatorTest {

@ParameterizedTest
@MethodSource("data")
fun `test getHash`(
inputHash: Int?,
obj: Any,
answer: Short,
) {
val hash = UpstreamCreator.getHash(inputHash, obj, hashes)

assertThat(hash).isEqualTo(answer)

println(hashes)
}

companion object {
private val hashes = HashSet<Short>()

@JvmStatic
fun data(): List<Arguments> =
listOf(
Arguments.of(49, 1, 49.toShort()),
Arguments.of(4000, 1, 4000.toShort()),
Arguments.of(24000, 1, 24000.toShort()),
Arguments.of(null, 49, (-49).toShort()),
Arguments.of(null, 49, (32718).toShort()),
Arguments.of(null, 49, (-32719).toShort()),
Arguments.of(null, 49, (-32768).toShort()),
Arguments.of(null, 49, (-32767).toShort()),
Arguments.of(null, 32718, (-32718).toShort()),
Arguments.of(null, 32718, (-50).toShort()),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import reactor.core.scheduler.Schedulers

class GenericGrpcUpstreamTest {
private val parentId = "testParent"
private val hash: Byte = 0x01
private val hash: Short = 0x01
private val role = UpstreamsConfig.UpstreamRole.PRIMARY
private val headSink = Sinks.many().multicast().directBestEffort<BlockchainOuterClass.ChainHead>()
private val remote =
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/configs/upstreams-node-id.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ upstreams:
ws:
url: "ws://localhost:9546"
- id: invalid_node_id
node-id: 256
node-id: 100000
chain: ethereum
connection:
grpc:
Expand Down

0 comments on commit 0f143c9

Please sign in to comment.