From 8d2ee01ae5e0b152a48b03ea426b491b3d860973 Mon Sep 17 00:00:00 2001 From: Michiel Vermeersch Date: Tue, 10 Aug 2021 09:06:54 +0200 Subject: [PATCH] Added subscribeBy(DisposableContainer, [...]) extensions. --- build.gradle.kts | 4 +- .../reactivex/rxjava3/kotlin/subscribers.kt | 118 +++++++++--- .../kotlin/CompletableConsumersTest.kt | 133 +++++++++++++ .../rxjava3/kotlin/ExtensionTests.kt | 12 +- .../rxjava3/kotlin/FlowableConsumersTest.kt | 181 ++++++++++++++++++ .../rxjava3/kotlin/MaybeConsumersTest.kt | 154 +++++++++++++++ .../rxjava3/kotlin/ObservableConsumersTest.kt | 181 ++++++++++++++++++ .../rxjava3/kotlin/SingleConsumersTest.kt | 100 ++++++++++ 8 files changed, 844 insertions(+), 39 deletions(-) create mode 100644 src/test/kotlin/io/reactivex/rxjava3/kotlin/CompletableConsumersTest.kt create mode 100644 src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableConsumersTest.kt create mode 100644 src/test/kotlin/io/reactivex/rxjava3/kotlin/MaybeConsumersTest.kt create mode 100644 src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableConsumersTest.kt create mode 100644 src/test/kotlin/io/reactivex/rxjava3/kotlin/SingleConsumersTest.kt diff --git a/build.gradle.kts b/build.gradle.kts index 829f306..c470248 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -41,7 +41,6 @@ dependencies { api("io.reactivex.rxjava3:rxjava:3.1.0") implementation(kotlin("stdlib")) - testImplementation("org.funktionale:funktionale-partials:1.0.0-final") testImplementation("junit:junit:4.12") testImplementation("org.mockito:mockito-core:1.10.19") @@ -60,7 +59,6 @@ val sourcesJar by tasks.creating(Jar::class) { val dokka by tasks.getting(DokkaTask::class) { outputFormat = "html" outputDirectory = "$buildDir/javadoc" - } //documentation @@ -145,7 +143,7 @@ bintray { setPublications(if (isRelease) release else snapshot) -// dryRun = true + // dryRun = true with(pkg) { userOrg = "reactivex" diff --git a/src/main/kotlin/io/reactivex/rxjava3/kotlin/subscribers.kt b/src/main/kotlin/io/reactivex/rxjava3/kotlin/subscribers.kt index c333f2a..6f1b1a1 100755 --- a/src/main/kotlin/io/reactivex/rxjava3/kotlin/subscribers.kt +++ b/src/main/kotlin/io/reactivex/rxjava3/kotlin/subscribers.kt @@ -6,11 +6,11 @@ import io.reactivex.rxjava3.annotations.CheckReturnValue import io.reactivex.rxjava3.annotations.SchedulerSupport import io.reactivex.rxjava3.core.* import io.reactivex.rxjava3.disposables.Disposable +import io.reactivex.rxjava3.disposables.DisposableContainer import io.reactivex.rxjava3.functions.Action import io.reactivex.rxjava3.functions.Consumer import io.reactivex.rxjava3.internal.functions.Functions - private val onNextStub: (Any) -> Unit = {} private val onErrorStub: (Throwable) -> Unit = {} private val onCompleteStub: () -> Unit = {} @@ -33,9 +33,9 @@ private fun (() -> Unit).asOnCompleteAction(): Action { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) fun Observable.subscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub, - onNext: (T) -> Unit = onNextStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub ): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) /** @@ -45,9 +45,9 @@ fun Observable.subscribeBy( @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) fun Flowable.subscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub, - onNext: (T) -> Unit = onNextStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub ): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) /** @@ -56,8 +56,8 @@ fun Flowable.subscribeBy( @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) fun Single.subscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onSuccess: (T) -> Unit = onNextStub + onError: (Throwable) -> Unit = onErrorStub, + onSuccess: (T) -> Unit = onNextStub ): Disposable = subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer()) /** @@ -66,9 +66,9 @@ fun Single.subscribeBy( @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) fun Maybe.subscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub, - onSuccess: (T) -> Unit = onNextStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onSuccess: (T) -> Unit = onNextStub ): Disposable = subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) /** @@ -77,8 +77,8 @@ fun Maybe.subscribeBy( @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) fun Completable.subscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub ): Disposable = when { // There are optimized versions of the completable Consumers, so we need to use the subscribe overloads // here. @@ -87,14 +87,74 @@ fun Completable.subscribeBy( else -> subscribe(onComplete.asOnCompleteAction(), Consumer(onError)) } +/** + * Overloaded subscribe function that allows passing named parameters + */ +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +fun Observable.subscribeBy( + container: DisposableContainer, + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub +): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction(), container) + +/** + * Overloaded subscribe function that allows passing named parameters + */ +@CheckReturnValue +@BackpressureSupport(BackpressureKind.UNBOUNDED_IN) +@SchedulerSupport(SchedulerSupport.NONE) +fun Flowable.subscribeBy( + container: DisposableContainer, + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub +): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction(), container) + +/** + * Overloaded subscribe function that allows passing named parameters + */ +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +fun Single.subscribeBy( + container: DisposableContainer, + onError: (Throwable) -> Unit = onErrorStub, + onSuccess: (T) -> Unit = onNextStub +): Disposable = subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), container) + +/** + * Overloaded subscribe function that allows passing named parameters + */ +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +fun Maybe.subscribeBy( + container: DisposableContainer, + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onSuccess: (T) -> Unit = onNextStub +): Disposable = + subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction(), container) + +/** + * Overloaded subscribe function that allows passing named parameters + */ +@CheckReturnValue +@SchedulerSupport(SchedulerSupport.NONE) +fun Completable.subscribeBy( + container: DisposableContainer, + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub +): Disposable = subscribe(onComplete.asOnCompleteAction(), onError.asOnErrorConsumer(), container) + /** * Overloaded blockingSubscribe function that allows passing named parameters */ @SchedulerSupport(SchedulerSupport.NONE) fun Observable.blockingSubscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub, - onNext: (T) -> Unit = onNextStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub ): Unit = blockingSubscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) /** @@ -103,9 +163,9 @@ fun Observable.blockingSubscribeBy( @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) fun Flowable.blockingSubscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub, - onNext: (T) -> Unit = onNextStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onNext: (T) -> Unit = onNextStub ): Unit = blockingSubscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) /** @@ -113,25 +173,25 @@ fun Flowable.blockingSubscribeBy( */ @SchedulerSupport(SchedulerSupport.NONE) fun Maybe.blockingSubscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub, - onSuccess: (T) -> Unit = onNextStub -) : Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub, + onSuccess: (T) -> Unit = onNextStub +): Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction()) /** * Overloaded blockingSubscribe function that allows passing named parameters */ @SchedulerSupport(SchedulerSupport.NONE) fun Single.blockingSubscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onSuccess: (T) -> Unit = onNextStub -) : Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer()) + onError: (Throwable) -> Unit = onErrorStub, + onSuccess: (T) -> Unit = onNextStub +): Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer()) /** * Overloaded blockingSubscribe function that allows passing named parameters */ @SchedulerSupport(SchedulerSupport.NONE) fun Completable.blockingSubscribeBy( - onError: (Throwable) -> Unit = onErrorStub, - onComplete: () -> Unit = onCompleteStub + onError: (Throwable) -> Unit = onErrorStub, + onComplete: () -> Unit = onCompleteStub ): Unit = blockingSubscribe(onComplete.asOnCompleteAction(), onError.asOnErrorConsumer()) diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/CompletableConsumersTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/CompletableConsumersTest.kt new file mode 100644 index 0000000..2a6467f --- /dev/null +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/CompletableConsumersTest.kt @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2021-present, RxKotlin Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.rxjava3.kotlin + +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection +import io.reactivex.rxjava3.subjects.CompletableSubject +import org.junit.Assert.* +import org.junit.Test +import java.io.IOException + +class CompletableConsumersTest { + + private fun CompositeDisposable.isEmpty(): Boolean = size() == 0 + private fun CompositeDisposable.isNotEmpty(): Boolean = size() > 0 + + private val disposables = CompositeDisposable() + private val subject = CompletableSubject.create() + private val events = mutableListOf() + + @Test + fun errorIntrospectionNormal() { + val disposable = subject.subscribeBy(disposables) as LambdaConsumerIntrospection + assertFalse(disposable.hasCustomOnError()) + } + + @Test + fun errorIntrospectionCustom() { + val disposable = subject.subscribeBy(disposables, onError = {}) as LambdaConsumerIntrospection + assertTrue(disposable.hasCustomOnError()) + } + + @Test + fun onErrorNormal() { + subject.subscribeBy( + disposables, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onComplete() + + assertTrue(disposables.isEmpty()) + assertTrue(events.isEmpty()) + } + + @Test + fun onErrorError() { + subject.subscribeBy( + disposables, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(1, events.size) + assertTrue(events[0] is IOException) + } + + @Test + fun onCompleteNormal() { + subject.subscribeBy( + disposables, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf("completed"), events) + } + + @Test + fun onCompleteError() { + subject.subscribeBy( + disposables, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(1, events.size) + assertTrue(events[0] is IOException) + } + + @Test + fun onCompleteDispose() { + val disposable = subject.subscribeBy( + disposables, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertFalse(disposable.isDisposed) + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + disposable.dispose() + + assertTrue(disposable.isDisposed) + assertTrue(disposables.isEmpty()) + assertTrue(events.isEmpty()) + } +} diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ExtensionTests.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ExtensionTests.kt index be0aa2b..70cb384 100644 --- a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ExtensionTests.kt +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ExtensionTests.kt @@ -21,7 +21,6 @@ import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.ObservableEmitter import io.reactivex.rxjava3.schedulers.TestScheduler import io.reactivex.rxjava3.functions.* -import org.funktionale.partials.invoke import org.junit.Assert.assertEquals import org.junit.Assert.fail import org.junit.Test @@ -248,11 +247,6 @@ class ExtensionTests : KotlinTests() { inOrder.verifyNoMoreInteractions() } - val funOnSubscribe: (Int, ObservableEmitter) -> Unit = { counter, subscriber -> - subscriber.onNext("hello_$counter") - subscriber.onComplete() - } - val asyncObservable: (ObservableEmitter) -> Unit = { subscriber -> thread { Thread.sleep(50) @@ -270,7 +264,11 @@ class ExtensionTests : KotlinTests() { get() = listOf(1, 3, 2, 5, 4).toObservable() val onSubscribe: (ObservableEmitter) -> Unit - get() = funOnSubscribe(p1 = counter++) // partial applied function + get() = { + it.onNext("hello_$counter") + it.onComplete() + counter ++ + } val observable: Observable get() = Observable.create(onSubscribe) diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableConsumersTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableConsumersTest.kt new file mode 100644 index 0000000..46ef417 --- /dev/null +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/FlowableConsumersTest.kt @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2021-present, RxKotlin Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.rxjava3.kotlin + +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection +import io.reactivex.rxjava3.processors.PublishProcessor +import org.junit.Assert.* +import org.junit.Test +import java.io.IOException + +class FlowableConsumersTest { + + private fun CompositeDisposable.isEmpty(): Boolean = size() == 0 + private fun CompositeDisposable.isNotEmpty(): Boolean = size() > 0 + + private val disposables = CompositeDisposable() + private val processor = PublishProcessor.create() + private val events = mutableListOf() + + @Test + fun errorIntrospectionNormal() { + val disposable = processor.subscribeBy(disposables) as LambdaConsumerIntrospection + assertFalse(disposable.hasCustomOnError()) + } + + @Test + fun errorIntrospectionCustom() { + val disposable = processor.subscribeBy(disposables, onError = {}) as LambdaConsumerIntrospection + assertTrue(disposable.hasCustomOnError()) + } + + @Test + fun onNextNormal() { + processor.subscribeBy( + disposables, + onNext = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + processor.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + processor.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorNormal() { + processor.subscribeBy( + disposables, + onNext = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + processor.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + processor.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorError() { + processor.subscribeBy( + disposables, + onNext = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + processor.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + processor.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(2, events.size) + assertEquals(1, events[0]) + assertTrue(events[1] is IOException) + } + + @Test + fun onCompleteNormal() { + processor.subscribeBy( + disposables, + onNext = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + processor.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + processor.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1, "completed"), events) + } + + @Test + fun onCompleteError() { + processor.subscribeBy( + disposables, + onNext = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + processor.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + processor.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(2, events.size) + assertEquals(1, events[0]) + assertTrue(events[1] is IOException) + } + + @Test + fun onCompleteDispose() { + val disposable = processor.subscribeBy( + disposables, + onNext = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertFalse(disposable.isDisposed) + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + disposable.dispose() + + assertTrue(disposable.isDisposed) + assertTrue(disposables.isEmpty()) + assertTrue(events.isEmpty()) + } +} diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/MaybeConsumersTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/MaybeConsumersTest.kt new file mode 100644 index 0000000..f6adbce --- /dev/null +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/MaybeConsumersTest.kt @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2021-present, RxKotlin Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.rxjava3.kotlin + +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection +import io.reactivex.rxjava3.subjects.MaybeSubject +import org.junit.Assert.* +import org.junit.Test +import java.io.IOException + +class MaybeConsumersTest { + + private fun CompositeDisposable.isEmpty(): Boolean = size() == 0 + private fun CompositeDisposable.isNotEmpty(): Boolean = size() > 0 + + private val disposables = CompositeDisposable() + private val subject = MaybeSubject.create() + private val events = mutableListOf() + + @Test + fun errorIntrospectionNormal() { + val disposable = subject.subscribeBy(disposables) as LambdaConsumerIntrospection + assertFalse(disposable.hasCustomOnError()) + } + + @Test + fun errorIntrospectionCustom() { + val disposable = subject.subscribeBy(disposables, onError = {}) as LambdaConsumerIntrospection + assertTrue(disposable.hasCustomOnError()) + } + + @Test + fun onSuccessNormal() { + subject.subscribeBy( + disposables, + onSuccess = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onSuccess(1) + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorNormal() { + subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onSuccess(1) + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorError() { + subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(1, events.size) + assertTrue(events[0] is IOException) + } + + @Test + fun onCompleteNormal() { + subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf("completed"), events) + } + + @Test + fun onCompleteError() { + subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(1, events.size) + assertTrue(events[0] is IOException) + } + + @Test + fun onCompleteDispose() { + val disposable = subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertFalse(disposable.isDisposed) + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + disposable.dispose() + + assertTrue(disposable.isDisposed) + assertTrue(disposables.isEmpty()) + assertTrue(events.isEmpty()) + } +} diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableConsumersTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableConsumersTest.kt new file mode 100644 index 0000000..cc5fac3 --- /dev/null +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/ObservableConsumersTest.kt @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2021-present, RxKotlin Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.rxjava3.kotlin + +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection +import io.reactivex.rxjava3.subjects.PublishSubject +import org.junit.Assert.* +import org.junit.Test +import java.io.IOException + +class ObservableConsumersTest { + + private fun CompositeDisposable.isEmpty(): Boolean = size() == 0 + private fun CompositeDisposable.isNotEmpty(): Boolean = size() > 0 + + private val disposables = CompositeDisposable() + private val subject = PublishSubject.create() + private val events = mutableListOf() + + @Test + fun errorIntrospectionNormal() { + val disposable = subject.subscribeBy(disposables) as LambdaConsumerIntrospection + assertFalse(disposable.hasCustomOnError()) + } + + @Test + fun errorIntrospectionCustom() { + val disposable = subject.subscribeBy(disposables, onError = {}) as LambdaConsumerIntrospection + assertTrue(disposable.hasCustomOnError()) + } + + @Test + fun onNextNormal() { + subject.subscribeBy( + disposables, + onNext = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + subject.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorNormal() { + subject.subscribeBy( + disposables, + onNext = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + subject.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1), events) + } + + @Test + fun onErrorError() { + subject.subscribeBy( + disposables, + onNext = events::add, + onError = events::add + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(2, events.size) + assertEquals(1, events[0]) + assertTrue(events[1] is IOException) + } + + @Test + fun onCompleteNormal() { + subject.subscribeBy( + disposables, + onNext = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + subject.onComplete() + + assertTrue(disposables.isEmpty()) + assertEquals(listOf(1, "completed"), events) + } + + @Test + fun onCompleteError() { + subject.subscribeBy( + disposables, + onNext = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + subject.onNext(1) + + assertTrue(disposables.isNotEmpty()) + assertEquals(listOf(1), events) + + subject.onError(IOException()) + + assertTrue(disposables.isEmpty()) + assertEquals(2, events.size) + assertEquals(1, events[0]) + assertTrue(events[1] is IOException) + } + + @Test + fun onCompleteDispose() { + val disposable = subject.subscribeBy( + disposables, + onNext = events::add, + onError = events::add, + onComplete = { events.add("completed") } + ) + + assertFalse(disposable.isDisposed) + assertTrue(disposables.isNotEmpty()) + assertTrue(events.isEmpty()) + + disposable.dispose() + + assertTrue(disposable.isDisposed) + assertTrue(disposables.isEmpty()) + assertTrue(events.isEmpty()) + } +} diff --git a/src/test/kotlin/io/reactivex/rxjava3/kotlin/SingleConsumersTest.kt b/src/test/kotlin/io/reactivex/rxjava3/kotlin/SingleConsumersTest.kt new file mode 100644 index 0000000..b9d4791 --- /dev/null +++ b/src/test/kotlin/io/reactivex/rxjava3/kotlin/SingleConsumersTest.kt @@ -0,0 +1,100 @@ +package io.reactivex.rxjava3.kotlin + +import io.reactivex.rxjava3.disposables.CompositeDisposable +import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection +import io.reactivex.rxjava3.subjects.SingleSubject +import org.junit.Assert +import org.junit.Test +import java.io.IOException + +class SingleConsumersTest { + + private fun CompositeDisposable.isEmpty(): Boolean = size() == 0 + private fun CompositeDisposable.isNotEmpty(): Boolean = size() > 0 + + private val disposables = CompositeDisposable() + private val subject = SingleSubject.create() + private val events = mutableListOf() + + @Test + fun errorIntrospectionNormal() { + val disposable = subject.subscribeBy(disposables) as LambdaConsumerIntrospection + Assert.assertFalse(disposable.hasCustomOnError()) + } + + @Test + fun errorIntrospectionCustom() { + val disposable = subject.subscribeBy(disposables, onError = {}) as LambdaConsumerIntrospection + Assert.assertTrue(disposable.hasCustomOnError()) + } + + @Test + fun onSuccessNormal() { + subject.subscribeBy( + disposables, + onSuccess = events::add + ) + + Assert.assertTrue(disposables.isNotEmpty()) + Assert.assertTrue(events.isEmpty()) + + subject.onSuccess(1) + + Assert.assertTrue(disposables.isEmpty()) + Assert.assertEquals(listOf(1), events) + } + + @Test + fun onErrorNormal() { + subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add + ) + + Assert.assertTrue(disposables.isNotEmpty()) + Assert.assertTrue(events.isEmpty()) + + subject.onSuccess(1) + + Assert.assertTrue(disposables.isEmpty()) + Assert.assertEquals(listOf(1), events) + } + + @Test + fun onErrorError() { + subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add + ) + + Assert.assertTrue(disposables.isNotEmpty()) + Assert.assertTrue(events.isEmpty()) + + subject.onError(IOException()) + + Assert.assertTrue(disposables.isEmpty()) + Assert.assertEquals(1, events.size) + Assert.assertTrue(events[0] is IOException) + } + + @Test + fun onCompleteDispose() { + val disposable = subject.subscribeBy( + disposables, + onSuccess = events::add, + onError = events::add + ) + + Assert.assertFalse(disposable.isDisposed) + Assert.assertTrue(disposables.isNotEmpty()) + Assert.assertTrue(events.isEmpty()) + + disposable.dispose() + + Assert.assertTrue(disposable.isDisposed) + Assert.assertTrue(disposables.isEmpty()) + Assert.assertTrue(events.isEmpty()) + } +}