Skip to content

Commit

Permalink
Wait for head for the full response
Browse files Browse the repository at this point in the history
  • Loading branch information
Кирилл committed Jul 15, 2024
1 parent 6b6979c commit 17c7bfb
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.switchIfEmpty
import java.time.Duration

@Service
class SubscribeChainStatus(
Expand Down Expand Up @@ -79,8 +80,13 @@ class SubscribeChainStatus(
.map { toFullResponse(it!!, ms) }
.switchIfEmpty {
// in case if there is still no head we mush wait until we get it
ms.getHead()
.getFlux()
// also we have to use 2 approaches due to the head's flux can be stopped
Flux.concat(
ms.getHead()
.getFlux(),
Flux.interval(Duration.ofSeconds(3))
.mapNotNull { ms.getHead().getCurrent() },
)
.next()
.map { toFullResponse(it!!, ms) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SubscribeChainStatusTest {
}

@Test
fun `first full event if there is already an ms head`() {
fun `first full event if there is already a ms head`() {
val head = mock<Head> {
on { getCurrent() } doReturn head(550)
on { getFlux() } doReturn Flux.empty()
Expand Down Expand Up @@ -106,6 +106,29 @@ class SubscribeChainStatusTest {
.verify(Duration.ofSeconds(1))
}

@Test
fun `first full event with awaiting a head from polling`() {
val head = mock<Head> {
on { getCurrent() } doReturn null doReturn head(550)
on { getFlux() } doReturn Flux.empty()
}
val ms = spy<TestMultistream> {
on { getHead() } doReturn head
on { stateEvents() } doReturn Flux.empty()
}
val msHolder = mock<MultistreamHolder> {
on { all() } doReturn listOf(ms)
}
val subscribeChainStatus = SubscribeChainStatus(msHolder, chainEventMapper)

StepVerifier.withVirtualTime { subscribeChainStatus.chainStatuses() }
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(3))
.expectNext(response(true))
.thenCancel()
.verify(Duration.ofSeconds(1))
}

@Test
fun `first full event with awaiting a head from a head stream and then state events`() {
val head = mock<Head> {
Expand Down

0 comments on commit 17c7bfb

Please sign in to comment.