diff --git a/src/main/kotlin/io/reactivex/rxkotlin/Flowables.kt b/src/main/kotlin/io/reactivex/rxkotlin/Flowables.kt index 1de4fa8..0341e12 100644 --- a/src/main/kotlin/io/reactivex/rxkotlin/Flowables.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/Flowables.kt @@ -367,3 +367,17 @@ inline fun Flowable.zipWith( @SchedulerSupport(SchedulerSupport.NONE) fun Flowable.zipWith(other: Publisher): Flowable> = zipWith(other, BiFunction { t, u -> Pair(t, u) }) + +/** + * Converts a list of flowables to a flowable list + */ +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +fun List>.zipFlowables(): Flowable> { + if (isEmpty()) return Flowable.just(emptyList()) + + return Flowable.zip(this) { + @Suppress("UNCHECKED_CAST") + return@zip (it as Array).toList() + } +} diff --git a/src/main/kotlin/io/reactivex/rxkotlin/Maybes.kt b/src/main/kotlin/io/reactivex/rxkotlin/Maybes.kt index 7e8e128..6b68eb9 100644 --- a/src/main/kotlin/io/reactivex/rxkotlin/Maybes.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/Maybes.kt @@ -4,6 +4,7 @@ package io.reactivex.rxkotlin import io.reactivex.Maybe import io.reactivex.MaybeSource +import io.reactivex.Single import io.reactivex.annotations.CheckReturnValue import io.reactivex.annotations.SchedulerSupport import io.reactivex.functions.* @@ -130,3 +131,14 @@ inline fun Maybe.zipWith( @SchedulerSupport(SchedulerSupport.NONE) fun Maybe.zipWith(other: MaybeSource): Maybe> = zipWith(other, BiFunction { t, u -> Pair(t, u) }) + +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +fun List>.zipMaybes(): Maybe> { + if (isEmpty()) return Maybe.just(emptyList()) + + return Maybe.zip(this) { + @Suppress("UNCHECKED_CAST") + return@zip (it as Array).toList() + } +} diff --git a/src/main/kotlin/io/reactivex/rxkotlin/Observables.kt b/src/main/kotlin/io/reactivex/rxkotlin/Observables.kt index 85621a2..ac8b67c 100644 --- a/src/main/kotlin/io/reactivex/rxkotlin/Observables.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/Observables.kt @@ -301,3 +301,17 @@ inline fun Observable.zipWith( @SchedulerSupport(SchedulerSupport.NONE) fun Observable.zipWith(other: ObservableSource): Observable> = zipWith(other, BiFunction { t, u -> Pair(t, u) }) + +/** + * Converts a list of observables to an observable list + */ +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +fun List>.zipObservables(): Observable> { + if (isEmpty()) return Observable.just(emptyList()) + + return Observable.zip(this) { + @Suppress("UNCHECKED_CAST") + return@zip (it as Array).toList() + } +} diff --git a/src/main/kotlin/io/reactivex/rxkotlin/Singles.kt b/src/main/kotlin/io/reactivex/rxkotlin/Singles.kt index e1d8d1b..79e06d4 100644 --- a/src/main/kotlin/io/reactivex/rxkotlin/Singles.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/Singles.kt @@ -123,3 +123,14 @@ inline fun Single.zipWith( @SchedulerSupport(SchedulerSupport.NONE) fun Single.zipWith(other: SingleSource): Single> = zipWith(other, BiFunction { t, u -> Pair(t, u) }) + +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +fun List>.zipSingles(): Single> { + if (isEmpty()) return Single.just(emptyList()) + + return Single.zip(this) { + @Suppress("UNCHECKED_CAST") + return@zip (it as Array).toList() + } +} diff --git a/src/test/kotlin/io/reactivex/rxkotlin/FlowablesTest.kt b/src/test/kotlin/io/reactivex/rxkotlin/FlowablesTest.kt new file mode 100644 index 0000000..3448f2d --- /dev/null +++ b/src/test/kotlin/io/reactivex/rxkotlin/FlowablesTest.kt @@ -0,0 +1,30 @@ +package io.reactivex.rxkotlin + +import io.reactivex.Flowable +import org.junit.Test + +class FlowablesTest : KotlinTests() { + + @Test fun zipFlowablesWithEmptyListReturnsEmptyList() { + val flowables = emptyList>() + + val zippedFlowables = flowables.zipFlowables().blockingFirst() + + assert(zippedFlowables.isEmpty()) + } + + @Test fun zipObservablesWithNonEmptyListReturnsNonEmptyListWithCorrectElements() { + val flowables = listOf( + Flowable.just(1), + Flowable.just(2), + Flowable.just(3) + ) + + val zippedFlowables = flowables.zipFlowables().blockingFirst() + + assert(zippedFlowables.size == 3) + assert(zippedFlowables[0] == 1) + assert(zippedFlowables[1] == 2) + assert(zippedFlowables[2] == 3) + } +} \ No newline at end of file diff --git a/src/test/kotlin/io/reactivex/rxkotlin/MaybesTest.kt b/src/test/kotlin/io/reactivex/rxkotlin/MaybesTest.kt new file mode 100644 index 0000000..5b58f8d --- /dev/null +++ b/src/test/kotlin/io/reactivex/rxkotlin/MaybesTest.kt @@ -0,0 +1,32 @@ +package io.reactivex.rxkotlin + +import io.reactivex.Maybe +import org.junit.Test + +class MaybesTest : KotlinTests() { + + @Test + fun zipMaybesWithEmptyListReturnsEmptyList() { + val maybes = emptyList>() + + val zippedMaybes = maybes.zipMaybes().blockingGet() + + assert(zippedMaybes.isEmpty()) + } + + @Test + fun zipMaybesWithNonEmptyListReturnsNonEmptyListWithCorrectElements() { + val maybes = listOf( + Maybe.just(1), + Maybe.just(2), + Maybe.just(3) + ) + + val zippedMaybes = maybes.zipMaybes().blockingGet() + + assert(zippedMaybes.size == 3) + assert(zippedMaybes[0] == 1) + assert(zippedMaybes[1] == 2) + assert(zippedMaybes[2] == 3) + } +} \ No newline at end of file diff --git a/src/test/kotlin/io/reactivex/rxkotlin/ObservablesTest.kt b/src/test/kotlin/io/reactivex/rxkotlin/ObservablesTest.kt index 1e03c15..7f6aa59 100644 --- a/src/test/kotlin/io/reactivex/rxkotlin/ObservablesTest.kt +++ b/src/test/kotlin/io/reactivex/rxkotlin/ObservablesTest.kt @@ -44,4 +44,27 @@ class ObservablesTest { assertEquals(triple, result) } + + @Test fun zipObservablesWithEmptyListReturnsEmptyList() { + val observables = emptyList>() + + val zippedObservables = observables.zipObservables().blockingFirst() + + assert(zippedObservables.isEmpty()) + } + + @Test fun zipObservablesWithNonEmptyListReturnsNonEmptyListWithCorrectElements() { + val observables = listOf( + Observable.just(1), + Observable.just(2), + Observable.just(3) + ) + + val zippedObservables = observables.zipObservables().blockingFirst() + + assert(zippedObservables.size == 3) + assert(zippedObservables[0] == 1) + assert(zippedObservables[1] == 2) + assert(zippedObservables[2] == 3) + } } \ No newline at end of file diff --git a/src/test/kotlin/io/reactivex/rxkotlin/SinglesTest.kt b/src/test/kotlin/io/reactivex/rxkotlin/SinglesTest.kt index 89fbf68..8a94446 100644 --- a/src/test/kotlin/io/reactivex/rxkotlin/SinglesTest.kt +++ b/src/test/kotlin/io/reactivex/rxkotlin/SinglesTest.kt @@ -107,6 +107,29 @@ class SinglesTest : KotlinTests() { assert(nine == 9, { -> "Should equal nine"}) }).blockingGet() } + + @Test fun zipSinglesWithEmptyListReturnsEmptyList() { + val singles = emptyList>() + + val zippedSingles = singles.zipSingles().blockingGet() + + assert(zippedSingles.isEmpty()) + } + + @Test fun zipSinglesWithNonEmptyListReturnsNonEmptyListWithCorrectElements() { + val singles = listOf( + Single.just(1), + Single.just(2), + Single.just(3) + ) + + val zippedSingles = singles.zipSingles().blockingGet() + + assert(zippedSingles.size == 3) + assert(zippedSingles[0] == 1) + assert(zippedSingles[1] == 2) + assert(zippedSingles[2] == 3) + } } fun SingleSourceInt(i: Int): SingleSource {