From 28db815b168682bb0d6d34524447b9c849154f6f Mon Sep 17 00:00:00 2001 From: Stepan Goncharov Date: Wed, 3 May 2017 10:41:38 +0800 Subject: [PATCH] combineLatest extensions for Observable and Flowable (#116) * combineLatest extensions for Observable and Flowable * jdk8 --- .travis.yml | 2 +- .../kotlin/io/reactivex/rxkotlin/flowable.kt | 14 ++++ .../io/reactivex/rxkotlin/observable.kt | 13 ++++ .../io/reactivex/rxkotlin/FlowableTest.kt | 78 ++++++++++++------- .../io/reactivex/rxkotlin/ObservableTest.kt | 16 +++- 5 files changed, 92 insertions(+), 31 deletions(-) diff --git a/.travis.yml b/.travis.yml index b481079..85a9559 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: java jdk: - - oraclejdk7 + - oraclejdk8 sudo: false # as per http://blog.travis-ci.com/2014-12-17-faster-builds-with-container-based-infrastructure/ diff --git a/src/main/kotlin/io/reactivex/rxkotlin/flowable.kt b/src/main/kotlin/io/reactivex/rxkotlin/flowable.kt index 554e705..1e7e552 100644 --- a/src/main/kotlin/io/reactivex/rxkotlin/flowable.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/flowable.kt @@ -1,6 +1,8 @@ package io.reactivex.rxkotlin import io.reactivex.Flowable +import io.reactivex.functions.BiFunction +import io.reactivex.functions.Function3 fun BooleanArray.toFlowable(): Flowable = asIterable().toFlowable() @@ -63,6 +65,18 @@ private fun Iterator.toIterable() = object : Iterable { override fun iterator(): Iterator = this@toIterable } +/** + * Combine latest operator that produces [Pair] + */ +fun Flowable.combineLatest(flowable: Flowable): Flowable> + = Flowable.combineLatest(this, flowable, BiFunction(::Pair)) + +/** + * Combine latest operator that produces [Triple] + */ +fun Flowable.combineLatest(flowable1: Flowable, flowable2: Flowable): Flowable> + = Flowable.combineLatest(this, flowable1, flowable2, Function3(::Triple)) + //EXTENSION FUNCTION OPERATORS /** diff --git a/src/main/kotlin/io/reactivex/rxkotlin/observable.kt b/src/main/kotlin/io/reactivex/rxkotlin/observable.kt index fc491c5..4b7d41b 100644 --- a/src/main/kotlin/io/reactivex/rxkotlin/observable.kt +++ b/src/main/kotlin/io/reactivex/rxkotlin/observable.kt @@ -1,6 +1,8 @@ package io.reactivex.rxkotlin import io.reactivex.Observable +import io.reactivex.functions.BiFunction +import io.reactivex.functions.Function3 fun BooleanArray.toObservable(): Observable = asIterable().toObservable() @@ -63,6 +65,17 @@ private fun Iterator.toIterable() = object : Iterable { override fun iterator(): Iterator = this@toIterable } +/** + * Combine latest operator that produces [Pair] + */ +fun Observable.combineLatest(observable: Observable): Observable> + = Observable.combineLatest(this, observable, BiFunction(::Pair)) + +/** + * Combine latest operator that produces [Triple] + */ +fun Observable.combineLatest(observable1: Observable, observable2: Observable): Observable> + = Observable.combineLatest(this, observable1, observable2, Function3(::Triple)) // EXTENSION FUNCTION OPERATORS diff --git a/src/test/kotlin/io/reactivex/rxkotlin/FlowableTest.kt b/src/test/kotlin/io/reactivex/rxkotlin/FlowableTest.kt index 5b04c55..f22f1b8 100644 --- a/src/test/kotlin/io/reactivex/rxkotlin/FlowableTest.kt +++ b/src/test/kotlin/io/reactivex/rxkotlin/FlowableTest.kt @@ -1,37 +1,41 @@ package io.reactivex.rxkotlin +import io.reactivex.BackpressureStrategy import io.reactivex.Flowable -import org.junit.Assert +import io.reactivex.Flowable.create +import io.reactivex.FlowableEmitter +import org.junit.Assert.assertEquals +import org.junit.Assert.assertNotNull import org.junit.Ignore import org.junit.Test import java.util.concurrent.atomic.AtomicInteger class FlowableTest { - private fun bufferedFlowable(source: (io.reactivex.FlowableEmitter) -> Unit) = - io.reactivex.Flowable.create(source, io.reactivex.BackpressureStrategy.BUFFER) + private fun bufferedFlowable(source: (FlowableEmitter) -> Unit) = + create(source, BackpressureStrategy.BUFFER) @org.junit.Test fun testCreation() { - val o0: io.reactivex.Flowable = io.reactivex.Flowable.empty() + val o0: Flowable = Flowable.empty() val list = bufferedFlowable { s -> s.onNext(1) s.onNext(777) s.onComplete() }.toList().blockingGet() - org.junit.Assert.assertEquals(listOf(1, 777), list) - val o1: io.reactivex.Flowable = listOf(1, 2, 3).toFlowable() - val o2: io.reactivex.Flowable> = io.reactivex.Flowable.just(listOf(1, 2, 3)) - - val o3: io.reactivex.Flowable = io.reactivex.Flowable.defer { bufferedFlowable { s -> s.onNext(1) } } - val o4: io.reactivex.Flowable = Array(3) { 0 }.toFlowable() - val o5: io.reactivex.Flowable = IntArray(3).toFlowable() - - org.junit.Assert.assertNotNull(o0) - org.junit.Assert.assertNotNull(o1) - org.junit.Assert.assertNotNull(o2) - org.junit.Assert.assertNotNull(o3) - org.junit.Assert.assertNotNull(o4) - org.junit.Assert.assertNotNull(o5) + assertEquals(listOf(1, 777), list) + val o1: Flowable = listOf(1, 2, 3).toFlowable() + val o2: Flowable> = Flowable.just(listOf(1, 2, 3)) + + val o3: Flowable = Flowable.defer { bufferedFlowable { s -> s.onNext(1) } } + val o4: Flowable = Array(3) { 0 }.toFlowable() + val o5: Flowable = IntArray(3).toFlowable() + + assertNotNull(o0) + assertNotNull(o1) + assertNotNull(o2) + assertNotNull(o3) + assertNotNull(o4) + assertNotNull(o5) } @org.junit.Test fun testExampleFromReadme() { @@ -48,34 +52,34 @@ class FlowableTest { map { it.toString() }. blockingGet() - Assert.assertEquals("Hello", result) + assertEquals("Hello", result) } @Test fun iteratorFlowable() { - Assert.assertEquals(listOf(1, 2, 3), listOf(1, 2, 3).iterator().toFlowable().toList().blockingGet()) + assertEquals(listOf(1, 2, 3), listOf(1, 2, 3).iterator().toFlowable().toList().blockingGet()) } @Test fun intProgressionStep1Empty() { - Assert.assertEquals(listOf(1), (1..1).toFlowable().toList().blockingGet()) + assertEquals(listOf(1), (1..1).toFlowable().toList().blockingGet()) } @Test fun intProgressionStep1() { - Assert.assertEquals((1..10).toList(), (1..10).toFlowable().toList().blockingGet()) + assertEquals((1..10).toList(), (1..10).toFlowable().toList().blockingGet()) } @Test fun intProgressionDownTo() { - Assert.assertEquals((1 downTo 10).toList(), (1 downTo 10).toFlowable().toList().blockingGet()) + assertEquals((1 downTo 10).toList(), (1 downTo 10).toFlowable().toList().blockingGet()) } @Ignore("Too slow") @Test fun intProgressionOverflow() { - Assert.assertEquals((0..10).toList().reversed(), (-10..Integer.MAX_VALUE).toFlowable().skip(Integer.MAX_VALUE.toLong()).map { Integer.MAX_VALUE - it }.toList().blockingGet()) + assertEquals((0..10).toList().reversed(), (-10..Integer.MAX_VALUE).toFlowable().skip(Integer.MAX_VALUE.toLong()).map { Integer.MAX_VALUE - it }.toList().blockingGet()) } @Test fun testFold() { val result = listOf(1, 2, 3).toFlowable().reduce(0) { acc, e -> acc + e }.blockingGet() - Assert.assertEquals(6, result) + assertEquals(6, result) } @Test fun `kotlin sequence should produce expected items and flowable be able to handle em`() { @@ -93,11 +97,11 @@ class FlowableTest { toList(). subscribe() - Assert.assertEquals(100, generated.get()) + assertEquals(100, generated.get()) } @Test fun testFlatMapSequence() { - Assert.assertEquals( + assertEquals( listOf(1, 2, 3, 2, 3, 4, 3, 4, 5), listOf(1, 2, 3).toFlowable().flatMapSequence { listOf(it, it + 1, it + 2).asSequence() }.toList().blockingGet() ) @@ -105,12 +109,12 @@ class FlowableTest { @Test fun testCombineLatest() { val list = listOf(1, 2, 3, 2, 3, 4, 3, 4, 5) - Assert.assertEquals(list, list.map { Flowable.just(it) }.combineLatest { it }.blockingFirst()) + assertEquals(list, list.map { Flowable.just(it) }.combineLatest { it }.blockingFirst()) } @Test fun testZip() { val list = listOf(1, 2, 3, 2, 3, 4, 3, 4, 5) - Assert.assertEquals(list, list.map { Flowable.just(it) }.zip { it }.blockingFirst()) + assertEquals(list, list.map { Flowable.just(it) }.zip { it }.blockingFirst()) } @Test fun testCast() { @@ -129,4 +133,20 @@ class FlowableTest { flowable.test() .assertError(ClassCastException::class.java) } + + @Test fun combineLatestPair() { + Flowable.just(3) + .combineLatest(Flowable.just(10)) + .map { (x, y) -> x * y } + .test() + .assertValues(30) + } + + @Test fun combineLatestTriple() { + Flowable.just(3) + .combineLatest(Flowable.just(10), Flowable.just(20)) + .map { (x, y, z) -> x * y * z } + .test() + .assertValues(600) + } } \ No newline at end of file diff --git a/src/test/kotlin/io/reactivex/rxkotlin/ObservableTest.kt b/src/test/kotlin/io/reactivex/rxkotlin/ObservableTest.kt index 69f5d2a..b80e069 100644 --- a/src/test/kotlin/io/reactivex/rxkotlin/ObservableTest.kt +++ b/src/test/kotlin/io/reactivex/rxkotlin/ObservableTest.kt @@ -1,7 +1,6 @@ package io.reactivex.rxkotlin import io.reactivex.Observable -import io.reactivex.observers.TestObserver import org.junit.Assert.assertEquals import org.junit.Assert.assertNotNull import org.junit.Ignore @@ -172,7 +171,22 @@ class ObservableTest { .assertValues(BigDecimal.valueOf(15, 1), 2, BigDecimal.valueOf(42), 15) .assertNoErrors() .assertComplete() + } + @Test fun combineLatestPair() { + Observable.just(3) + .combineLatest(Observable.just(10)) + .map { (x, y) -> x * y } + .test() + .assertValues(30) + } + + @Test fun combineLatestTriple() { + Observable.just(3) + .combineLatest(Observable.just(10), Observable.just(20)) + .map { (x, y, z) -> x * y * z } + .test() + .assertValues(600) } } \ No newline at end of file