diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatus.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatus.kt index e8c468f2b..3322a06d6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatus.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatus.kt @@ -12,6 +12,7 @@ import org.springframework.stereotype.Service import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.kotlin.core.publisher.switchIfEmpty +import java.time.Duration @Service class SubscribeChainStatus( @@ -79,8 +80,13 @@ class SubscribeChainStatus( .map { toFullResponse(it!!, ms) } .switchIfEmpty { // in case if there is still no head we mush wait until we get it - ms.getHead() - .getFlux() + // also we have to use 2 approaches due to the head's flux can be stopped + Flux.concat( + ms.getHead() + .getFlux(), + Flux.interval(Duration.ofSeconds(3)) + .mapNotNull { ms.getHead().getCurrent() }, + ) .next() .map { toFullResponse(it!!, ms) } } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatusTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatusTest.kt index 37c894920..7b95f0e51 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatusTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/rpc/SubscribeChainStatusTest.kt @@ -63,7 +63,7 @@ class SubscribeChainStatusTest { } @Test - fun `first full event if there is already an ms head`() { + fun `first full event if there is already a ms head`() { val head = mock { on { getCurrent() } doReturn head(550) on { getFlux() } doReturn Flux.empty() @@ -106,6 +106,29 @@ class SubscribeChainStatusTest { .verify(Duration.ofSeconds(1)) } + @Test + fun `first full event with awaiting a head from polling`() { + val head = mock { + on { getCurrent() } doReturn null doReturn head(550) + on { getFlux() } doReturn Flux.empty() + } + val ms = spy { + on { getHead() } doReturn head + on { stateEvents() } doReturn Flux.empty() + } + val msHolder = mock { + on { all() } doReturn listOf(ms) + } + val subscribeChainStatus = SubscribeChainStatus(msHolder, chainEventMapper) + + StepVerifier.withVirtualTime { subscribeChainStatus.chainStatuses() } + .expectSubscription() + .expectNoEvent(Duration.ofSeconds(3)) + .expectNext(response(true)) + .thenCancel() + .verify(Duration.ofSeconds(1)) + } + @Test fun `first full event with awaiting a head from a head stream and then state events`() { val head = mock {