Skip to content

Commit

Permalink
Fix solana subs (#449)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Apr 4, 2024
1 parent 2f5963f commit 44fab69
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class GenericSubscriptionConnect(

@Suppress("UNCHECKED_CAST")
override fun createConnection(): Flux<Any> {
return conn.subscribe(ChainRequest(topic, ListParams(getParams(params))))
return conn.subscribe(ChainRequest(topic, ListParams(getParams(params) as List<Any>)))
.data
.timeout(Duration.ofSeconds(60), Mono.empty())
.onErrorResume { Mono.empty() } as Flux<Any>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.emeraldpay.dshackle.upstream.generic

import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import org.junit.jupiter.api.Test
import org.mockito.Mockito.verify
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
import java.time.Duration
import java.util.concurrent.atomic.AtomicReference

class GenericSubscriptionConnectTest {

@Test
fun `test request param is flat list`() {
val param: List<Any> = listOf("all")
val topic = "topic"
val response = "hello".toByteArray()
val ws = mock<WsSubscriptions> {
on { subscribe(ChainRequest(topic, ListParams(param))) } doReturn
WsSubscriptions.SubscribeData(Flux.just(response), "", AtomicReference(""))
}

val genericSubscriptionConnect = GenericSubscriptionConnect(ws, topic, param)

StepVerifier.create(genericSubscriptionConnect.createConnection())
.expectNext(response)
.expectComplete()
.verify(Duration.ofSeconds(1))

verify(ws).subscribe(ChainRequest(topic, ListParams(param)))
}
}

0 comments on commit 44fab69

Please sign in to comment.