From bf9a5416c8331ebf633bff3c0481b10c424dcf30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 13 May 2020 16:10:16 +0200 Subject: [PATCH] Copy internal helper methods from RxJava to avoid OSGi problems --- README.md | 2 +- gradle.properties | 4 +- .../rxjava3/interop/BackpressureHelper.java | 156 + .../interop/CompletableV1ToCompletableV3.java | 2 +- .../interop/CompletableV1ToMaybeV3.java | 2 +- .../interop/CompletableV3ToCompletableV1.java | 2 +- .../rxjava3/interop/DisposableHelper.java | 141 + .../interop/DisposableV3ToSubscriptionV1.java | 2 +- .../interop/FlowableV3ToObservableV1.java | 10 +- .../interop/MaybeV3ToCompletableV1.java | 8 +- .../rxjava3/interop/MaybeV3ToSingleV1.java | 8 +- .../interop/ObservableV1ToFlowableV3.java | 2 +- .../interop/ObservableV1ToObservableV3.java | 2 +- .../interop/ProcessorV3ToSubjectV1.java | 2 +- .../rxjava3/interop/RxJavaInterop.java | 2 +- .../interop/SchedulerV1ToSchedulerV3.java | 2 +- .../interop/SchedulerV3ToSchedulerV1.java | 2 +- .../rxjava3/interop/SingleV1ToMaybeV3.java | 2 +- .../rxjava3/interop/SingleV1ToSingleV3.java | 2 +- .../rxjava3/interop/SingleV3ToSingleV1.java | 8 +- .../interop/SubjectV1ToProcessorV3.java | 2 +- .../rxjava3/interop/SubjectV1ToSubjectV3.java | 2 +- .../rxjava3/interop/SubjectV3ToSubjectV1.java | 16 +- .../rxjava3/interop/SubscriptionHelper.java | 197 ++ .../interop/SubscriptionV1ToDisposableV3.java | 2 +- .../interop/BackpressureHelperTest.java | 184 + .../rxjava3/interop/DisposableHelperTest.java | 139 + .../rxjava3/interop/RxJavaInteropTest.java | 7 +- ...vaInteropV1SchedulerToV3SchedulerTest.java | 17 +- ...vaInteropV3SchedulerToV1SchedulerTest.java | 14 +- .../interop/SubscriptionHelperTest.java | 170 + .../rxjava3/interop/TestException.java | 58 + .../akarnokd/rxjava3/interop/TestHelper.java | 3094 +++++++++++++++++ 33 files changed, 4207 insertions(+), 56 deletions(-) create mode 100644 src/main/java/hu/akarnokd/rxjava3/interop/BackpressureHelper.java create mode 100644 src/main/java/hu/akarnokd/rxjava3/interop/DisposableHelper.java create mode 100644 src/main/java/hu/akarnokd/rxjava3/interop/SubscriptionHelper.java create mode 100644 src/test/java/hu/akarnokd/rxjava3/interop/BackpressureHelperTest.java create mode 100644 src/test/java/hu/akarnokd/rxjava3/interop/DisposableHelperTest.java create mode 100644 src/test/java/hu/akarnokd/rxjava3/interop/SubscriptionHelperTest.java create mode 100644 src/test/java/hu/akarnokd/rxjava3/interop/TestException.java create mode 100644 src/test/java/hu/akarnokd/rxjava3/interop/TestHelper.java diff --git a/README.md b/README.md index 8d56db5..4e0fcc7 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Check out the [https://github.com/akarnokd/RxJavaBridge](https://github.com/akar ``` dependencies { - compile "com.github.akarnokd:rxjava3-interop:3.0.0" + compile "com.github.akarnokd:rxjava3-interop:3.0.1" } ``` diff --git a/gradle.properties b/gradle.properties index 5e83503..7643af4 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ GROUP=com.github.akarnokd -VERSION_NAME=3.0.0 -version=3.0.0 +VERSION_NAME=3.0.1 +version=3.0.1 POM_ARTIFACT_ID=rxjava3-interop POM_NAME=Interop library between RxJava 1.x and 3.x diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/BackpressureHelper.java b/src/main/java/hu/akarnokd/rxjava3/interop/BackpressureHelper.java new file mode 100644 index 0000000..f8a74bf --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/interop/BackpressureHelper.java @@ -0,0 +1,156 @@ +/* + * Copyright 2016-2020 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.interop; + +import java.util.concurrent.atomic.AtomicLong; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Utility class to help with backpressure-related operations such as request aggregation. + */ +final class BackpressureHelper { + /** Utility class. */ + private BackpressureHelper() { + throw new IllegalStateException("No instances!"); + } + + /** + * Adds two long values and caps the sum at {@link Long#MAX_VALUE}. + * @param a the first value + * @param b the second value + * @return the sum capped at {@link Long#MAX_VALUE} + */ + public static long addCap(long a, long b) { + long u = a + b; + if (u < 0L) { + return Long.MAX_VALUE; + } + return u; + } + + /** + * Multiplies two long values and caps the product at {@link Long#MAX_VALUE}. + * @param a the first value + * @param b the second value + * @return the product capped at {@link Long#MAX_VALUE} + */ + public static long multiplyCap(long a, long b) { + long u = a * b; + if (((a | b) >>> 31) != 0) { + if (u / a != b) { + return Long.MAX_VALUE; + } + } + return u; + } + + /** + * Atomically adds the positive value n to the requested value in the {@link AtomicLong} and + * caps the result at {@link Long#MAX_VALUE} and returns the previous value. + * @param requested the {@code AtomicLong} holding the current requested value + * @param n the value to add, must be positive (not verified) + * @return the original value before the add + */ + public static long add(@NonNull AtomicLong requested, long n) { + for (;;) { + long r = requested.get(); + if (r == Long.MAX_VALUE) { + return Long.MAX_VALUE; + } + long u = addCap(r, n); + if (requested.compareAndSet(r, u)) { + return r; + } + } + } + + /** + * Atomically adds the positive value n to the requested value in the {@link AtomicLong} and + * caps the result at {@link Long#MAX_VALUE} and returns the previous value and + * considers {@link Long#MIN_VALUE} as a cancel indication (no addition then). + * @param requested the {@code AtomicLong} holding the current requested value + * @param n the value to add, must be positive (not verified) + * @return the original value before the add + */ + public static long addCancel(@NonNull AtomicLong requested, long n) { + for (;;) { + long r = requested.get(); + if (r == Long.MIN_VALUE) { + return Long.MIN_VALUE; + } + if (r == Long.MAX_VALUE) { + return Long.MAX_VALUE; + } + long u = addCap(r, n); + if (requested.compareAndSet(r, u)) { + return r; + } + } + } + + /** + * Atomically subtract the given number (positive, not validated) from the target field unless it contains {@link Long#MAX_VALUE}. + * @param requested the target field holding the current requested amount + * @param n the produced element count, positive (not validated) + * @return the new amount + */ + public static long produced(@NonNull AtomicLong requested, long n) { + for (;;) { + long current = requested.get(); + if (current == Long.MAX_VALUE) { + return Long.MAX_VALUE; + } + long update = current - n; + if (update < 0L) { + RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update)); + update = 0L; + } + if (requested.compareAndSet(current, update)) { + return update; + } + } + } + + /** + * Atomically subtract the given number (positive, not validated) from the target field if + * it doesn't contain {@link Long#MIN_VALUE} (indicating some cancelled state) or {@link Long#MAX_VALUE} (unbounded mode). + * @param requested the target field holding the current requested amount + * @param n the produced element count, positive (not validated) + * @return the new amount + */ + public static long producedCancel(@NonNull AtomicLong requested, long n) { + for (;;) { + long current = requested.get(); + if (current == Long.MIN_VALUE) { + return Long.MIN_VALUE; + } + if (current == Long.MAX_VALUE) { + return Long.MAX_VALUE; + } + long update = current - n; + if (update < 0L) { + RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update)); + update = 0L; + } + if (requested.compareAndSet(current, update)) { + return update; + } + } + } +} \ No newline at end of file diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/CompletableV1ToCompletableV3.java b/src/main/java/hu/akarnokd/rxjava3/interop/CompletableV1ToCompletableV3.java index 8086c2e..f0e3c4e 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/CompletableV1ToCompletableV3.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/CompletableV1ToCompletableV3.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/CompletableV1ToMaybeV3.java b/src/main/java/hu/akarnokd/rxjava3/interop/CompletableV1ToMaybeV3.java index 3323f9d..3dd9dac 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/CompletableV1ToMaybeV3.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/CompletableV1ToMaybeV3.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/CompletableV3ToCompletableV1.java b/src/main/java/hu/akarnokd/rxjava3/interop/CompletableV3ToCompletableV1.java index baa225f..18fd660 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/CompletableV3ToCompletableV1.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/CompletableV3ToCompletableV1.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/DisposableHelper.java b/src/main/java/hu/akarnokd/rxjava3/interop/DisposableHelper.java new file mode 100644 index 0000000..3197c20 --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/interop/DisposableHelper.java @@ -0,0 +1,141 @@ +/* + * Copyright 2016-2020 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.interop; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.ProtocolViolationException; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Utility methods for working with Disposables atomically. + */ +enum DisposableHelper implements Disposable { + /** + * The singleton instance representing a terminal, disposed state, don't leak it. + */ + DISPOSED + ; + + /** + * Checks if the given Disposable is the common {@link #DISPOSED} enum value. + * @param d the disposable to check + * @return true if d is {@link #DISPOSED} + */ + public static boolean isDisposed(Disposable d) { + return d == DISPOSED; + } + + /** + * Atomically sets the field to the given non-null Disposable and returns true + * or returns false if the field is non-null. + * If the target field contains the common DISPOSED instance, the supplied disposable + * is disposed. If the field contains other non-null Disposable, an IllegalStateException + * is signalled to the RxJavaPlugins.onError hook. + * + * @param field the target field + * @param d the disposable to set, not null + * @return true if the operation succeeded, false + */ + public static boolean setOnce(AtomicReference field, Disposable d) { + Objects.requireNonNull(d, "d is null"); + if (!field.compareAndSet(null, d)) { + d.dispose(); + if (field.get() != DISPOSED) { + reportDisposableSet(); + } + return false; + } + return true; + } + + /** + * Atomically disposes the Disposable in the field if not already disposed. + * @param field the target field + * @return true if the current thread managed to dispose the Disposable + */ + public static boolean dispose(AtomicReference field) { + Disposable current = field.get(); + Disposable d = DISPOSED; + if (current != d) { + current = field.getAndSet(d); + if (current != d) { + if (current != null) { + current.dispose(); + } + return true; + } + } + return false; + } + + /** + * Verifies that current is null, next is not null, otherwise signals errors + * to the RxJavaPlugins and returns false. + * @param current the current Disposable, expected to be null + * @param next the next Disposable, expected to be non-null + * @return true if the validation succeeded + */ + public static boolean validate(Disposable current, Disposable next) { + if (next == null) { + RxJavaPlugins.onError(new NullPointerException("next is null")); + return false; + } + if (current != null) { + next.dispose(); + reportDisposableSet(); + return false; + } + return true; + } + + /** + * Reports that the disposable is already set to the RxJavaPlugins error handler. + */ + public static void reportDisposableSet() { + RxJavaPlugins.onError(new ProtocolViolationException("Disposable already set!")); + } + + /** + * Atomically tries to set the given Disposable on the field if it is null or disposes it if + * the field contains {@link #DISPOSED}. + * @param field the target field + * @param d the disposable to set + * @return true if successful, false otherwise + */ + public static boolean trySet(AtomicReference field, Disposable d) { + if (!field.compareAndSet(null, d)) { + if (field.get() == DISPOSED) { + d.dispose(); + } + return false; + } + return true; + } + + @Override + public void dispose() { + // deliberately no-op + } + + @Override + public boolean isDisposed() { + return true; + } +} \ No newline at end of file diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/DisposableV3ToSubscriptionV1.java b/src/main/java/hu/akarnokd/rxjava3/interop/DisposableV3ToSubscriptionV1.java index a27b23c..0d6e5e6 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/DisposableV3ToSubscriptionV1.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/DisposableV3ToSubscriptionV1.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/FlowableV3ToObservableV1.java b/src/main/java/hu/akarnokd/rxjava3/interop/FlowableV3ToObservableV1.java index 1813614..07c4ea6 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/FlowableV3ToObservableV1.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/FlowableV3ToObservableV1.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,23 +58,23 @@ static final class SourceSubscriber @Override public void request(long n) { if (n != 0L) { - io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper.deferredRequest(this, requested, n); + SubscriptionHelper.deferredRequest(this, requested, n); } } @Override public void unsubscribe() { - io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper.cancel(this); + SubscriptionHelper.cancel(this); } @Override public boolean isUnsubscribed() { - return io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper.CANCELLED == get(); + return SubscriptionHelper.CANCELLED == get(); } @Override public void onSubscribe(org.reactivestreams.Subscription s) { - io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper.deferredSetOnce(this, requested, s); + SubscriptionHelper.deferredSetOnce(this, requested, s); } @Override diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/MaybeV3ToCompletableV1.java b/src/main/java/hu/akarnokd/rxjava3/interop/MaybeV3ToCompletableV1.java index b3d94b2..984b12f 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/MaybeV3ToCompletableV1.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/MaybeV3ToCompletableV1.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,17 +52,17 @@ static final class MaybeV3Observer @Override public void unsubscribe() { - io.reactivex.rxjava3.internal.disposables.DisposableHelper.dispose(this); + DisposableHelper.dispose(this); } @Override public boolean isUnsubscribed() { - return io.reactivex.rxjava3.internal.disposables.DisposableHelper.isDisposed(get()); + return DisposableHelper.isDisposed(get()); } @Override public void onSubscribe(io.reactivex.rxjava3.disposables.Disposable d) { - io.reactivex.rxjava3.internal.disposables.DisposableHelper.setOnce(this, d); + DisposableHelper.setOnce(this, d); } @Override diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/MaybeV3ToSingleV1.java b/src/main/java/hu/akarnokd/rxjava3/interop/MaybeV3ToSingleV1.java index acceb1a..030c740 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/MaybeV3ToSingleV1.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/MaybeV3ToSingleV1.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,17 +53,17 @@ static final class MaybeV3Observer @Override public void unsubscribe() { - io.reactivex.rxjava3.internal.disposables.DisposableHelper.dispose(this); + DisposableHelper.dispose(this); } @Override public boolean isUnsubscribed() { - return io.reactivex.rxjava3.internal.disposables.DisposableHelper.isDisposed(get()); + return DisposableHelper.isDisposed(get()); } @Override public void onSubscribe(io.reactivex.rxjava3.disposables.Disposable d) { - io.reactivex.rxjava3.internal.disposables.DisposableHelper.setOnce(this, d); + DisposableHelper.setOnce(this, d); } @Override diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/ObservableV1ToFlowableV3.java b/src/main/java/hu/akarnokd/rxjava3/interop/ObservableV1ToFlowableV3.java index 3d42d3f..7287a4b 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/ObservableV1ToFlowableV3.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/ObservableV1ToFlowableV3.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/ObservableV1ToObservableV3.java b/src/main/java/hu/akarnokd/rxjava3/interop/ObservableV1ToObservableV3.java index 0b87b5c..55ac140 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/ObservableV1ToObservableV3.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/ObservableV1ToObservableV3.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/ProcessorV3ToSubjectV1.java b/src/main/java/hu/akarnokd/rxjava3/interop/ProcessorV3ToSubjectV1.java index d052faf..27b1de8 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/ProcessorV3ToSubjectV1.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/ProcessorV3ToSubjectV1.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/RxJavaInterop.java b/src/main/java/hu/akarnokd/rxjava3/interop/RxJavaInterop.java index 8b512df..74c4529 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/RxJavaInterop.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/RxJavaInterop.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/SchedulerV1ToSchedulerV3.java b/src/main/java/hu/akarnokd/rxjava3/interop/SchedulerV1ToSchedulerV3.java index df61c7c..de1b7d3 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/SchedulerV1ToSchedulerV3.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/SchedulerV1ToSchedulerV3.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/SchedulerV3ToSchedulerV1.java b/src/main/java/hu/akarnokd/rxjava3/interop/SchedulerV3ToSchedulerV1.java index 6493753..343274f 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/SchedulerV3ToSchedulerV1.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/SchedulerV3ToSchedulerV1.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/SingleV1ToMaybeV3.java b/src/main/java/hu/akarnokd/rxjava3/interop/SingleV1ToMaybeV3.java index cd50fdd..6c98877 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/SingleV1ToMaybeV3.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/SingleV1ToMaybeV3.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/SingleV1ToSingleV3.java b/src/main/java/hu/akarnokd/rxjava3/interop/SingleV1ToSingleV3.java index f6c9238..58c4c12 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/SingleV1ToSingleV3.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/SingleV1ToSingleV3.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/SingleV3ToSingleV1.java b/src/main/java/hu/akarnokd/rxjava3/interop/SingleV3ToSingleV1.java index 863427d..33ce04a 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/SingleV3ToSingleV1.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/SingleV3ToSingleV1.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,17 +52,17 @@ static final class SourceSingleObserver @Override public void unsubscribe() { - io.reactivex.rxjava3.internal.disposables.DisposableHelper.dispose(this); + DisposableHelper.dispose(this); } @Override public boolean isUnsubscribed() { - return io.reactivex.rxjava3.internal.disposables.DisposableHelper.isDisposed(get()); + return DisposableHelper.isDisposed(get()); } @Override public void onSubscribe(io.reactivex.rxjava3.disposables.Disposable d) { - io.reactivex.rxjava3.internal.disposables.DisposableHelper.setOnce(this, d); + DisposableHelper.setOnce(this, d); } @Override diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/SubjectV1ToProcessorV3.java b/src/main/java/hu/akarnokd/rxjava3/interop/SubjectV1ToProcessorV3.java index 1cafe89..abbe43f 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/SubjectV1ToProcessorV3.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/SubjectV1ToProcessorV3.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/SubjectV1ToSubjectV3.java b/src/main/java/hu/akarnokd/rxjava3/interop/SubjectV1ToSubjectV3.java index 140b66d..2ac763f 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/SubjectV1ToSubjectV3.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/SubjectV1ToSubjectV3.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/SubjectV3ToSubjectV1.java b/src/main/java/hu/akarnokd/rxjava3/interop/SubjectV3ToSubjectV1.java index 609f3f3..32de960 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/SubjectV3ToSubjectV1.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/SubjectV3ToSubjectV1.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -111,30 +111,30 @@ static final class SourceObserver @Override public void request(long n) { if (n > 0L) { - io.reactivex.rxjava3.internal.util.BackpressureHelper.add(requested, n); + BackpressureHelper.add(requested, n); } } @Override public void unsubscribe() { - io.reactivex.rxjava3.internal.disposables.DisposableHelper.dispose(this); + DisposableHelper.dispose(this); } @Override public boolean isUnsubscribed() { - return io.reactivex.rxjava3.internal.disposables.DisposableHelper.isDisposed(get()); + return DisposableHelper.isDisposed(get()); } @Override public void onSubscribe(io.reactivex.rxjava3.disposables.Disposable d) { - io.reactivex.rxjava3.internal.disposables.DisposableHelper.setOnce(this, d); + DisposableHelper.setOnce(this, d); } @Override public void onNext(T t) { if (requested.get() != 0) { actual.onNext(t); - io.reactivex.rxjava3.internal.util.BackpressureHelper.produced(requested, 1); + BackpressureHelper.produced(requested, 1); } else { unsubscribe(); actual.onError(new rx.exceptions.MissingBackpressureException()); @@ -143,13 +143,13 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - lazySet(io.reactivex.rxjava3.internal.disposables.DisposableHelper.DISPOSED); + lazySet(DisposableHelper.DISPOSED); actual.onError(t); } @Override public void onComplete() { - lazySet(io.reactivex.rxjava3.internal.disposables.DisposableHelper.DISPOSED); + lazySet(DisposableHelper.DISPOSED); actual.onCompleted(); } } diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/SubscriptionHelper.java b/src/main/java/hu/akarnokd/rxjava3/interop/SubscriptionHelper.java new file mode 100644 index 0000000..7b41203 --- /dev/null +++ b/src/main/java/hu/akarnokd/rxjava3/interop/SubscriptionHelper.java @@ -0,0 +1,197 @@ +/* + * Copyright 2016-2020 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.interop; + +import java.util.Objects; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.Subscription; + +import io.reactivex.rxjava3.exceptions.ProtocolViolationException; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Utility methods to validate Subscriptions in the various onSubscribe calls. + */ +enum SubscriptionHelper implements Subscription { + /** + * Represents a cancelled Subscription. + *

Don't leak this instance! + */ + CANCELLED + ; + + @Override + public void request(long n) { + // deliberately ignored + } + + @Override + public void cancel() { + // deliberately ignored + } + + /** + * Verifies that current is null, next is not null, otherwise signals errors + * to the RxJavaPlugins and returns false. + * @param current the current Subscription, expected to be null + * @param next the next Subscription, expected to be non-null + * @return true if the validation succeeded + */ + public static boolean validate(Subscription current, Subscription next) { + if (next == null) { + RxJavaPlugins.onError(new NullPointerException("next is null")); + return false; + } + if (current != null) { + next.cancel(); + reportSubscriptionSet(); + return false; + } + return true; + } + + /** + * Reports that the subscription is already set to the RxJavaPlugins error handler, + * which is an indication of a onSubscribe management bug. + */ + public static void reportSubscriptionSet() { + RxJavaPlugins.onError(new ProtocolViolationException("Subscription already set!")); + } + + /** + * Validates that the n is positive. + * @param n the request amount + * @return false if n is non-positive. + */ + public static boolean validate(long n) { + if (n <= 0) { + RxJavaPlugins.onError(new IllegalArgumentException("n > 0 required but it was " + n)); + return false; + } + return true; + } + + + /** + * Atomically sets the subscription on the field if it is still null. + *

If the field is not null and doesn't contain the {@link #CANCELLED} + * instance, the {@link #reportSubscriptionSet()} is called. + * @param field the target field + * @param s the new subscription to set + * @return true if the operation succeeded, false if the target field was not null. + */ + public static boolean setOnce(AtomicReference field, Subscription s) { + Objects.requireNonNull(s, "s is null"); + if (!field.compareAndSet(null, s)) { + s.cancel(); + if (field.get() != CANCELLED) { + reportSubscriptionSet(); + } + return false; + } + return true; + } + + /** + * Atomically swaps in the common cancelled subscription instance + * and cancels the previous subscription if any. + * @param field the target field to dispose the contents of + * @return true if the swap from the non-cancelled instance to the + * common cancelled instance happened in the caller's thread (allows + * further one-time actions). + */ + public static boolean cancel(AtomicReference field) { + Subscription current = field.get(); + if (current != CANCELLED) { + current = field.getAndSet(CANCELLED); + if (current != CANCELLED) { + if (current != null) { + current.cancel(); + } + return true; + } + } + return false; + } + + /** + * Atomically sets the new Subscription on the field and requests any accumulated amount + * from the requested field. + * @param field the target field for the new Subscription + * @param requested the current requested amount + * @param s the new Subscription, not null (verified) + * @return true if the Subscription was set the first time + */ + public static boolean deferredSetOnce(AtomicReference field, AtomicLong requested, + Subscription s) { + if (SubscriptionHelper.setOnce(field, s)) { + long r = requested.getAndSet(0L); + if (r != 0L) { + s.request(r); + } + return true; + } + return false; + } + + /** + * Atomically requests from the Subscription in the field if not null, otherwise accumulates + * the request amount in the requested field to be requested once the field is set to non-null. + * @param field the target field that may already contain a Subscription + * @param requested the current requested amount + * @param n the request amount, positive (verified) + */ + public static void deferredRequest(AtomicReference field, AtomicLong requested, long n) { + Subscription s = field.get(); + if (s != null) { + s.request(n); + } else { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + + s = field.get(); + if (s != null) { + long r = requested.getAndSet(0L); + if (r != 0L) { + s.request(r); + } + } + } + } + } + + /** + * Atomically sets the subscription on the field if it is still null and issues a positive request + * to the given {@link Subscription}. + *

+ * If the field is not null and doesn't contain the {@link #CANCELLED} + * instance, the {@link #reportSubscriptionSet()} is called. + * @param field the target field + * @param s the new subscription to set + * @param request the amount to request, positive (not verified) + * @return true if the operation succeeded, false if the target field was not null. + * @since 2.1.11 + */ + public static boolean setOnce(AtomicReference field, Subscription s, long request) { + if (setOnce(field, s)) { + s.request(request); + return true; + } + return false; + } +} \ No newline at end of file diff --git a/src/main/java/hu/akarnokd/rxjava3/interop/SubscriptionV1ToDisposableV3.java b/src/main/java/hu/akarnokd/rxjava3/interop/SubscriptionV1ToDisposableV3.java index 9ec36b7..dfb0fc6 100644 --- a/src/main/java/hu/akarnokd/rxjava3/interop/SubscriptionV1ToDisposableV3.java +++ b/src/main/java/hu/akarnokd/rxjava3/interop/SubscriptionV1ToDisposableV3.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/hu/akarnokd/rxjava3/interop/BackpressureHelperTest.java b/src/test/java/hu/akarnokd/rxjava3/interop/BackpressureHelperTest.java new file mode 100644 index 0000000..230a81e --- /dev/null +++ b/src/test/java/hu/akarnokd/rxjava3/interop/BackpressureHelperTest.java @@ -0,0 +1,184 @@ +/* + * Copyright 2016-2020 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.interop; + +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.Test; + +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +public class BackpressureHelperTest { + @Test + public void constructorShouldBePrivate() { + TestHelper.checkUtilityClass(BackpressureHelper.class); + } + + @Test + public void addCap() { + assertEquals(2L, BackpressureHelper.addCap(1, 1)); + assertEquals(Long.MAX_VALUE, BackpressureHelper.addCap(1, Long.MAX_VALUE - 1)); + assertEquals(Long.MAX_VALUE, BackpressureHelper.addCap(1, Long.MAX_VALUE)); + assertEquals(Long.MAX_VALUE, BackpressureHelper.addCap(Long.MAX_VALUE - 1, Long.MAX_VALUE - 1)); + assertEquals(Long.MAX_VALUE, BackpressureHelper.addCap(Long.MAX_VALUE, Long.MAX_VALUE)); + } + + @Test + public void multiplyCap() { + assertEquals(6, BackpressureHelper.multiplyCap(2, 3)); + assertEquals(Long.MAX_VALUE, BackpressureHelper.multiplyCap(2, Long.MAX_VALUE)); + assertEquals(Long.MAX_VALUE, BackpressureHelper.multiplyCap(Long.MAX_VALUE, Long.MAX_VALUE)); + assertEquals(Long.MAX_VALUE, BackpressureHelper.multiplyCap(1L << 32, 1L << 32)); + + } + + @Test + public void producedMore() { + List list = TestHelper.trackPluginErrors(); + + try { + AtomicLong requested = new AtomicLong(1); + + assertEquals(0, BackpressureHelper.produced(requested, 2)); + + TestHelper.assertError(list, 0, IllegalStateException.class, "More produced than requested: -1"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void producedMoreCancel() { + List list = TestHelper.trackPluginErrors(); + + try { + AtomicLong requested = new AtomicLong(1); + + assertEquals(0, BackpressureHelper.producedCancel(requested, 2)); + + TestHelper.assertError(list, 0, IllegalStateException.class, "More produced than requested: -1"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void requestProduceRace() { + final AtomicLong requested = new AtomicLong(1); + + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + + Runnable r1 = new Runnable() { + @Override + public void run() { + BackpressureHelper.produced(requested, 1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + BackpressureHelper.add(requested, 1); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void requestCancelProduceRace() { + final AtomicLong requested = new AtomicLong(1); + + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + + Runnable r1 = new Runnable() { + @Override + public void run() { + BackpressureHelper.produced(requested, 1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + BackpressureHelper.addCancel(requested, 1); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void utilityClass() { + TestHelper.checkUtilityClass(BackpressureHelper.class); + } + + @Test + public void capped() { + final AtomicLong requested = new AtomicLong(Long.MIN_VALUE); + + assertEquals(Long.MIN_VALUE, BackpressureHelper.addCancel(requested, 1)); + assertEquals(Long.MIN_VALUE, BackpressureHelper.addCancel(requested, Long.MAX_VALUE)); + + requested.set(0); + + assertEquals(0, BackpressureHelper.addCancel(requested, Long.MAX_VALUE)); + assertEquals(Long.MAX_VALUE, BackpressureHelper.addCancel(requested, 1)); + assertEquals(Long.MAX_VALUE, BackpressureHelper.addCancel(requested, Long.MAX_VALUE)); + + requested.set(0); + + assertEquals(0, BackpressureHelper.add(requested, Long.MAX_VALUE)); + assertEquals(Long.MAX_VALUE, BackpressureHelper.add(requested, 1)); + assertEquals(Long.MAX_VALUE, BackpressureHelper.add(requested, Long.MAX_VALUE)); + + assertEquals(Long.MAX_VALUE, BackpressureHelper.produced(requested, 1)); + assertEquals(Long.MAX_VALUE, BackpressureHelper.produced(requested, Long.MAX_VALUE)); + } + + @Test + public void multiplyCap2() { + assertEquals(Long.MAX_VALUE, BackpressureHelper.multiplyCap(3, Long.MAX_VALUE >> 1)); + + assertEquals(Long.MAX_VALUE, BackpressureHelper.multiplyCap(1, Long.MAX_VALUE)); + } + + @Test + public void alreadyNegativeMaxLong() { + assertEquals(Long.MIN_VALUE, BackpressureHelper.addCancel(new AtomicLong(Long.MIN_VALUE), 1)); + } + + @Test + public void alreadyMaxLong() { + assertEquals(Long.MAX_VALUE, BackpressureHelper.addCancel(new AtomicLong(Long.MAX_VALUE), 1)); + } + + @Test + public void producedNegativeMaxLong() { + assertEquals(Long.MIN_VALUE, BackpressureHelper.producedCancel(new AtomicLong(Long.MIN_VALUE), 1)); + } + + @Test + public void producedMaxLong() { + assertEquals(Long.MAX_VALUE, BackpressureHelper.producedCancel(new AtomicLong(Long.MAX_VALUE), 1)); + } +} \ No newline at end of file diff --git a/src/test/java/hu/akarnokd/rxjava3/interop/DisposableHelperTest.java b/src/test/java/hu/akarnokd/rxjava3/interop/DisposableHelperTest.java new file mode 100644 index 0000000..1b84b5a --- /dev/null +++ b/src/test/java/hu/akarnokd/rxjava3/interop/DisposableHelperTest.java @@ -0,0 +1,139 @@ +/* + * Copyright 2016-2020 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.interop; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.ProtocolViolationException; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +public class DisposableHelperTest { + @Test + public void enumMethods() { + assertEquals(1, DisposableHelper.values().length); + assertNotNull(DisposableHelper.valueOf("DISPOSED")); + } + + @Test + public void innerDisposed() { + assertTrue(DisposableHelper.DISPOSED.isDisposed()); + DisposableHelper.DISPOSED.dispose(); + assertTrue(DisposableHelper.DISPOSED.isDisposed()); + } + + @Test + public void validationNull() { + List list = TestHelper.trackPluginErrors(); + try { + assertFalse(DisposableHelper.validate(null, null)); + + TestHelper.assertError(list, 0, NullPointerException.class, "next is null"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void disposeRace() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + final AtomicReference d = new AtomicReference<>(); + + Runnable r = new Runnable() { + @Override + public void run() { + DisposableHelper.dispose(d); + } + }; + + TestHelper.race(r, r); + } + } + + @Test + public void dispose() { + Disposable u = Disposable.empty(); + final AtomicReference d = new AtomicReference<>(u); + + DisposableHelper.dispose(d); + + assertTrue(u.isDisposed()); + } + + @Test + public void trySet() { + AtomicReference ref = new AtomicReference<>(); + + Disposable d1 = Disposable.empty(); + + assertTrue(DisposableHelper.trySet(ref, d1)); + + Disposable d2 = Disposable.empty(); + + assertFalse(DisposableHelper.trySet(ref, d2)); + + assertFalse(d1.isDisposed()); + + assertFalse(d2.isDisposed()); + + DisposableHelper.dispose(ref); + + Disposable d3 = Disposable.empty(); + + assertFalse(DisposableHelper.trySet(ref, d3)); + + assertTrue(d3.isDisposed()); + } + + @Test + public void reportDisposableSet() throws Throwable { + TestHelper.withErrorTracking(errors -> { + + DisposableHelper.validate(Disposable.empty(), Disposable.empty()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + }); + } + + @Test + public void isDisposed() { + assertFalse(DisposableHelper.isDisposed(Disposable.empty())); + assertTrue(DisposableHelper.isDisposed(DisposableHelper.DISPOSED)); + } + + @Test + public void validate() { + assertTrue(DisposableHelper.validate(null, Disposable.empty())); + } + + @Test + public void setOnceAndReport() throws Throwable { + TestHelper.withErrorTracking(errors -> { + + AtomicReference ref = new AtomicReference<>(Disposable.empty()); + + assertFalse(DisposableHelper.setOnce(ref, Disposable.empty())); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + }); + } +} \ No newline at end of file diff --git a/src/test/java/hu/akarnokd/rxjava3/interop/RxJavaInteropTest.java b/src/test/java/hu/akarnokd/rxjava3/interop/RxJavaInteropTest.java index 47064f8..cf859b6 100644 --- a/src/test/java/hu/akarnokd/rxjava3/interop/RxJavaInteropTest.java +++ b/src/test/java/hu/akarnokd/rxjava3/interop/RxJavaInteropTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 David Karnok + * Copyright 2016-2020 David Karnok * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,11 +27,10 @@ import org.junit.Test; import org.reactivestreams.Subscription; -import hu.akarnokd.rxjava3.interop.RxJavaInterop; import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Single; -import io.reactivex.rxjava3.disposables.*; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.functions.Predicate; import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; @@ -172,7 +171,7 @@ public boolean test(Throwable error) throws Throwable { } }); } - + @Test public void o1f3Error() { assertFailureAndMessage(toV3Flowable(rx.Observable.error(new RuntimeException("Forced failure"))) diff --git a/src/test/java/hu/akarnokd/rxjava3/interop/RxJavaInteropV1SchedulerToV3SchedulerTest.java b/src/test/java/hu/akarnokd/rxjava3/interop/RxJavaInteropV1SchedulerToV3SchedulerTest.java index ed113f4..ce13aa1 100644 --- a/src/test/java/hu/akarnokd/rxjava3/interop/RxJavaInteropV1SchedulerToV3SchedulerTest.java +++ b/src/test/java/hu/akarnokd/rxjava3/interop/RxJavaInteropV1SchedulerToV3SchedulerTest.java @@ -1,3 +1,19 @@ +/* + * Copyright 2016-2020 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package hu.akarnokd.rxjava3.interop; import static java.util.concurrent.TimeUnit.MINUTES; @@ -8,7 +24,6 @@ import org.junit.Test; -import hu.akarnokd.rxjava3.interop.RxJavaInterop; import rx.internal.schedulers.SchedulerLifecycle; public class RxJavaInteropV1SchedulerToV3SchedulerTest { diff --git a/src/test/java/hu/akarnokd/rxjava3/interop/RxJavaInteropV3SchedulerToV1SchedulerTest.java b/src/test/java/hu/akarnokd/rxjava3/interop/RxJavaInteropV3SchedulerToV1SchedulerTest.java index 0f4e213..cc20707 100644 --- a/src/test/java/hu/akarnokd/rxjava3/interop/RxJavaInteropV3SchedulerToV1SchedulerTest.java +++ b/src/test/java/hu/akarnokd/rxjava3/interop/RxJavaInteropV3SchedulerToV1SchedulerTest.java @@ -1,18 +1,16 @@ package hu.akarnokd.rxjava3.interop; -import io.reactivex.rxjava3.core.Scheduler; -import io.reactivex.rxjava3.schedulers.TestScheduler; +import static java.util.concurrent.TimeUnit.*; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + import org.junit.Test; -import hu.akarnokd.rxjava3.interop.RxJavaInterop; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.schedulers.TestScheduler; import rx.functions.Action0; import rx.internal.schedulers.SchedulerLifecycle; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - public class RxJavaInteropV3SchedulerToV1SchedulerTest { @Test diff --git a/src/test/java/hu/akarnokd/rxjava3/interop/SubscriptionHelperTest.java b/src/test/java/hu/akarnokd/rxjava3/interop/SubscriptionHelperTest.java new file mode 100644 index 0000000..5707191 --- /dev/null +++ b/src/test/java/hu/akarnokd/rxjava3/interop/SubscriptionHelperTest.java @@ -0,0 +1,170 @@ +/* + * Copyright 2016-2020 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.interop; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.*; + +import java.util.List; +import java.util.concurrent.atomic.*; + +import org.junit.Test; +import org.reactivestreams.Subscription; + +import io.reactivex.rxjava3.exceptions.ProtocolViolationException; +import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +public class SubscriptionHelperTest { + + @Test + public void checkEnum() { + TestHelper.checkEnum(SubscriptionHelper.class); + } + + @Test + public void validateNullThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + SubscriptionHelper.validate(null, null); + + TestHelper.assertError(errors, 0, NullPointerException.class, "next is null"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void cancelNoOp() { + SubscriptionHelper.CANCELLED.cancel(); + } + + @Test + public void cancelRace() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + final AtomicReference atomicSubscription = new AtomicReference<>(); + + Runnable r = new Runnable() { + @Override + public void run() { + SubscriptionHelper.cancel(atomicSubscription); + } + }; + + TestHelper.race(r, r); + } + } + + @Test + public void invalidDeferredRequest() { + AtomicReference atomicSubscription = new AtomicReference<>(); + AtomicLong r = new AtomicLong(); + + List errors = TestHelper.trackPluginErrors(); + try { + SubscriptionHelper.deferredRequest(atomicSubscription, r, -99); + + TestHelper.assertError(errors, 0, IllegalArgumentException.class, "n > 0 required but it was -99"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void deferredRace() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + final AtomicReference atomicSubscription = new AtomicReference<>(); + final AtomicLong r = new AtomicLong(); + + final AtomicLong q = new AtomicLong(); + + final Subscription a = new Subscription() { + @Override + public void request(long n) { + q.addAndGet(n); + } + + @Override + public void cancel() { + + } + }; + + Runnable r1 = new Runnable() { + @Override + public void run() { + SubscriptionHelper.deferredSetOnce(atomicSubscription, r, a); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + SubscriptionHelper.deferredRequest(atomicSubscription, r, 1); + } + }; + + TestHelper.race(r1, r2); + + assertSame(a, atomicSubscription.get()); + assertEquals(1, q.get()); + assertEquals(0, r.get()); + } + } + + @Test + public void setOnceAndRequest() { + AtomicReference ref = new AtomicReference<>(); + + Subscription sub = mock(Subscription.class); + + assertTrue(SubscriptionHelper.setOnce(ref, sub, 1)); + + verify(sub).request(1); + verify(sub, never()).cancel(); + + List errors = TestHelper.trackPluginErrors(); + try { + sub = mock(Subscription.class); + + assertFalse(SubscriptionHelper.setOnce(ref, sub, 1)); + + verify(sub, never()).request(anyLong()); + verify(sub).cancel(); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void reportDisposableSet() throws Throwable { + TestHelper.withErrorTracking(errors -> { + + SubscriptionHelper.validate(new BooleanSubscription(), new BooleanSubscription()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + }); + } + + @Test + public void validate() { + assertTrue(SubscriptionHelper.validate(null, new BooleanSubscription())); + } +} \ No newline at end of file diff --git a/src/test/java/hu/akarnokd/rxjava3/interop/TestException.java b/src/test/java/hu/akarnokd/rxjava3/interop/TestException.java new file mode 100644 index 0000000..13bb553 --- /dev/null +++ b/src/test/java/hu/akarnokd/rxjava3/interop/TestException.java @@ -0,0 +1,58 @@ +/* + * Copyright 2016-2020 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.interop; + +/** + * Exception for testing if unchecked expections propagate as-is without confusing with + * other type of common exceptions. + */ +public final class TestException extends RuntimeException { + + private static final long serialVersionUID = -1438148770465406172L; + + /** + * Constructs a TestException without message or cause. + */ + public TestException() { + super(); + } + + /** + * Counstructs a TestException with message and cause. + * @param message the message + * @param cause the cause + */ + public TestException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs a TestException with a message only. + * @param message the message + */ + public TestException(String message) { + super(message); + } + + /** + * Constructs a TestException with a cause only. + * @param cause the cause + */ + public TestException(Throwable cause) { + super(cause); + } +} \ No newline at end of file diff --git a/src/test/java/hu/akarnokd/rxjava3/interop/TestHelper.java b/src/test/java/hu/akarnokd/rxjava3/interop/TestHelper.java new file mode 100644 index 0000000..bcd1b26 --- /dev/null +++ b/src/test/java/hu/akarnokd/rxjava3/interop/TestHelper.java @@ -0,0 +1,3094 @@ +/* + * Copyright 2016-2020 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hu.akarnokd.rxjava3.interop; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.lang.management.*; +import java.lang.reflect.*; +import java.net.URL; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.reactivestreams.*; + +import io.reactivex.rxjava3.annotations.*; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.internal.operators.completable.CompletableToFlowable; +import io.reactivex.rxjava3.internal.operators.maybe.MaybeToFlowable; +import io.reactivex.rxjava3.internal.operators.single.SingleToFlowable; +import io.reactivex.rxjava3.internal.subscriptions.*; +import io.reactivex.rxjava3.internal.util.ExceptionHelper; +import io.reactivex.rxjava3.parallel.ParallelFlowable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.subjects.Subject; +import io.reactivex.rxjava3.subscribers.TestSubscriber; + +/** + * Common methods for helping with tests. + */ +public enum TestHelper { + ; + + /** + * Number of times to loop a {@link #race(Runnable, Runnable)} invocation + * by default. + */ + public static final int RACE_DEFAULT_LOOPS = 2500; + + /** + * Number of times to loop a {@link #race(Runnable, Runnable)} invocation + * in tests with race conditions requiring more runs to check. + */ + public static final int RACE_LONG_LOOPS = 10000; + + /** + * Mocks a subscriber and prepares it to request {@link Long#MAX_VALUE}. + * @param the value type + * @return the mocked subscriber + */ + @SuppressWarnings("unchecked") + public static FlowableSubscriber mockSubscriber() { + FlowableSubscriber w = mock(FlowableSubscriber.class); + + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock a) throws Throwable { + Subscription s = a.getArgument(0); + s.request(Long.MAX_VALUE); + return null; + } + }).when(w).onSubscribe((Subscription)any()); + + return w; + } + + /** + * Mocks an Observer with the proper receiver type. + * @param the value type + * @return the mocked observer + */ + @SuppressWarnings("unchecked") + public static Observer mockObserver() { + return mock(Observer.class); + } + + /** + * Mocks an MaybeObserver with the proper receiver type. + * @param the value type + * @return the mocked observer + */ + @SuppressWarnings("unchecked") + public static MaybeObserver mockMaybeObserver() { + return mock(MaybeObserver.class); + } + + /** + * Mocks an SingleObserver with the proper receiver type. + * @param the value type + * @return the mocked observer + */ + @SuppressWarnings("unchecked") + public static SingleObserver mockSingleObserver() { + return mock(SingleObserver.class); + } + + /** + * Mocks an CompletableObserver. + * @return the mocked observer + */ + public static CompletableObserver mockCompletableObserver() { + return mock(CompletableObserver.class); + } + + /** + * Validates that the given class, when forcefully instantiated throws + * an IllegalArgumentException("No instances!") exception. + * @param clazz the class to test, not null + */ + public static void checkUtilityClass(Class clazz) { + try { + Constructor c = clazz.getDeclaredConstructor(); + + c.setAccessible(true); + + try { + c.newInstance(); + fail("Should have thrown InvocationTargetException(IllegalStateException)"); + } catch (InvocationTargetException ex) { + assertEquals("No instances!", ex.getCause().getMessage()); + } + } catch (Exception ex) { + AssertionError ae = new AssertionError(ex.toString()); + ae.initCause(ex); + throw ae; + } + } + + public static List trackPluginErrors() { + final List list = Collections.synchronizedList(new ArrayList<>()); + + RxJavaPlugins.setErrorHandler(new Consumer() { + @Override + public void accept(Throwable t) { + list.add(t); + } + }); + + return list; + } + + public static void assertError(List list, int index, Class clazz) { + Throwable ex = list.get(index); + if (!clazz.isInstance(ex)) { + AssertionError err = new AssertionError(clazz + " expected but got " + list.get(index)); + err.initCause(list.get(index)); + throw err; + } + } + + public static void assertUndeliverable(List list, int index, Class clazz) { + Throwable ex = list.get(index); + if (!(ex instanceof UndeliverableException)) { + AssertionError err = new AssertionError("Outer exception UndeliverableException expected but got " + list.get(index)); + err.initCause(list.get(index)); + throw err; + } + ex = ex.getCause(); + if (!clazz.isInstance(ex)) { + AssertionError err = new AssertionError("Inner exception " + clazz + " expected but got " + list.get(index)); + err.initCause(list.get(index)); + throw err; + } + } + + public static void assertError(List list, int index, Class clazz, String message) { + Throwable ex = list.get(index); + if (!clazz.isInstance(ex)) { + AssertionError err = new AssertionError("Type " + clazz + " expected but got " + ex); + err.initCause(ex); + throw err; + } + if (!Objects.equals(message, ex.getMessage())) { + AssertionError err = new AssertionError("Message " + message + " expected but got " + ex.getMessage()); + err.initCause(ex); + throw err; + } + } + + public static void assertUndeliverable(List list, int index, Class clazz, String message) { + Throwable ex = list.get(index); + if (!(ex instanceof UndeliverableException)) { + AssertionError err = new AssertionError("Outer exception UndeliverableException expected but got " + list.get(index)); + err.initCause(list.get(index)); + throw err; + } + ex = ex.getCause(); + if (!clazz.isInstance(ex)) { + AssertionError err = new AssertionError("Inner exception " + clazz + " expected but got " + list.get(index)); + err.initCause(list.get(index)); + throw err; + } + if (!Objects.equals(message, ex.getMessage())) { + AssertionError err = new AssertionError("Message " + message + " expected but got " + ex.getMessage()); + err.initCause(ex); + throw err; + } + } + + /** + * Verify that a specific enum type has no enum constants. + * @param the enum type + * @param e the enum class instance + */ + public static > void assertEmptyEnum(Class e) { + assertEquals(0, e.getEnumConstants().length); + + try { + try { + Method m0 = e.getDeclaredMethod("values"); + + Object[] a = (Object[])m0.invoke(null); + assertEquals(0, a.length); + + Method m = e.getDeclaredMethod("valueOf", String.class); + + m.invoke("INSTANCE"); + fail("Should have thrown!"); + } catch (InvocationTargetException ex) { + fail(ex.toString()); + } catch (IllegalAccessException ex) { + fail(ex.toString()); + } catch (IllegalArgumentException ex) { + // we expected this + } + } catch (NoSuchMethodException ex) { + fail(ex.toString()); + } + } + + /** + * Assert that by consuming the Publisher with a bad request amount, it is + * reported to the plugin error handler promptly. + * @param source the source to consume + */ + public static void assertBadRequestReported(Publisher source) { + List list = trackPluginErrors(); + try { + final CountDownLatch cdl = new CountDownLatch(1); + + source.subscribe(new FlowableSubscriber() { + + @Override + public void onSubscribe(Subscription s) { + try { + s.request(-99); + s.cancel(); + s.cancel(); + } finally { + cdl.countDown(); + } + } + + @Override + public void onNext(Object t) { + + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onComplete() { + + } + + }); + + try { + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw new AssertionError(ex.getMessage()); + } + + assertTrue(list.toString(), list.get(0) instanceof IllegalArgumentException); + assertEquals("n > 0 required but it was -99", list.get(0).getMessage()); + } finally { + RxJavaPlugins.setErrorHandler(null); + } + } + + /** + * Assert that by consuming the Publisher with a bad request amount, it is + * reported to the plugin error handler promptly. + * @param source the source to consume + */ + public static void assertBadRequestReported(ParallelFlowable source) { + List list = trackPluginErrors(); + try { + final CountDownLatch cdl = new CountDownLatch(1); + + FlowableSubscriber bad = new FlowableSubscriber() { + + @Override + public void onSubscribe(Subscription s) { + try { + s.request(-99); + s.cancel(); + s.cancel(); + } finally { + cdl.countDown(); + } + } + + @Override + public void onNext(Object t) { + + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onComplete() { + + } + + }; + + @SuppressWarnings("unchecked") + FlowableSubscriber[] subs = new FlowableSubscriber[source.parallelism()]; + subs[0] = bad; + for (int i = 1; i < subs.length; i++) { + subs[i] = NoOpConsumer.INSTANCE; + } + source.subscribe(subs); + + try { + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw new AssertionError(ex.getMessage()); + } + + assertTrue(list.toString(), list.get(0) instanceof IllegalArgumentException); + assertEquals("n > 0 required but it was -99", list.get(0).getMessage()); + } finally { + RxJavaPlugins.setErrorHandler(null); + } + } + + /** + * Synchronizes the execution of two runnables (as much as possible) + * to test race conditions. + *

The method blocks until both have run to completion. + * @param r1 the first runnable + * @param r2 the second runnable + * @see #RACE_DEFAULT_LOOPS + * @see #RACE_LONG_LOOPS + */ + public static void race(final Runnable r1, final Runnable r2) { + race(r1, r2, Schedulers.single()); + } + /** + * Synchronizes the execution of two runnables (as much as possible) + * to test race conditions. + *

The method blocks until both have run to completion. + * @param r1 the first runnable + * @param r2 the second runnable + * @param s the scheduler to use + * @see #RACE_DEFAULT_LOOPS + * @see #RACE_LONG_LOOPS + */ + public static void race(final Runnable r1, final Runnable r2, Scheduler s) { + final AtomicInteger count = new AtomicInteger(2); + final CountDownLatch cdl = new CountDownLatch(2); + + final Throwable[] errors = { null, null }; + + s.scheduleDirect(new Runnable() { + @Override + public void run() { + if (count.decrementAndGet() != 0) { + while (count.get() != 0) { } + } + + try { + try { + r1.run(); + } catch (Throwable ex) { + errors[0] = ex; + } + } finally { + cdl.countDown(); + } + } + }); + + if (count.decrementAndGet() != 0) { + while (count.get() != 0) { } + } + + try { + try { + r2.run(); + } catch (Throwable ex) { + errors[1] = ex; + } + } finally { + cdl.countDown(); + } + + try { + if (!cdl.await(5, TimeUnit.SECONDS)) { + throw new AssertionError("The wait timed out!"); + } + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + if (errors[0] != null && errors[1] == null) { + throw ExceptionHelper.wrapOrThrow(errors[0]); + } + + if (errors[0] == null && errors[1] != null) { + throw ExceptionHelper.wrapOrThrow(errors[1]); + } + + if (errors[0] != null && errors[1] != null) { + throw new CompositeException(errors); + } + } + + /** + * Cast the given Throwable to CompositeException and returns its inner + * Throwable list. + * @param ex the target Throwable + * @return the list of Throwables + */ + public static List compositeList(Throwable ex) { + if (ex instanceof UndeliverableException) { + ex = ex.getCause(); + } + return ((CompositeException)ex).getExceptions(); + } + + /** + * Assert that the offer methods throw UnsupportedOperationExcetpion. + * @param q the queue implementation + */ + public static void assertNoOffer(SimpleQueue q) { + try { + q.offer(null); + fail("Should have thrown!"); + } catch (UnsupportedOperationException ex) { + // expected + } + try { + q.offer(null, null); + fail("Should have thrown!"); + } catch (UnsupportedOperationException ex) { + // expected + } + } + + @SuppressWarnings("unchecked") + public static > void checkEnum(Class enumClass) { + try { + Method m = enumClass.getMethod("values"); + m.setAccessible(true); + Method e = enumClass.getMethod("valueOf", String.class); + m.setAccessible(true); + + for (Enum o : (Enum[])m.invoke(null)) { + assertSame(o, e.invoke(null, o.name())); + } + + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + /** + * Calls onSubscribe twice and checks if it doesn't affect the first Subscription while + * reporting it to plugin error handler. + * @param subscriber the target + */ + public static void doubleOnSubscribe(Subscriber subscriber) { + List errors = trackPluginErrors(); + try { + BooleanSubscription s1 = new BooleanSubscription(); + + subscriber.onSubscribe(s1); + + BooleanSubscription s2 = new BooleanSubscription(); + + subscriber.onSubscribe(s2); + + assertFalse(s1.isCancelled()); + + assertTrue(s2.isCancelled()); + + assertError(errors, 0, IllegalStateException.class, "Subscription already set!"); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Calls onSubscribe twice and checks if it doesn't affect the first Disposable while + * reporting it to plugin error handler. + * @param observer the target + */ + public static void doubleOnSubscribe(Observer observer) { + List errors = trackPluginErrors(); + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + assertFalse(d1.isDisposed()); + + assertTrue(d2.isDisposed()); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Calls onSubscribe twice and checks if it doesn't affect the first Disposable while + * reporting it to plugin error handler. + * @param observer the target + */ + public static void doubleOnSubscribe(SingleObserver observer) { + List errors = trackPluginErrors(); + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + assertFalse(d1.isDisposed()); + + assertTrue(d2.isDisposed()); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Calls onSubscribe twice and checks if it doesn't affect the first Disposable while + * reporting it to plugin error handler. + * @param observer the target + */ + public static void doubleOnSubscribe(CompletableObserver observer) { + List errors = trackPluginErrors(); + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + assertFalse(d1.isDisposed()); + + assertTrue(d2.isDisposed()); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Calls onSubscribe twice and checks if it doesn't affect the first Disposable while + * reporting it to plugin error handler. + * @param observer the target + */ + public static void doubleOnSubscribe(MaybeObserver observer) { + List errors = trackPluginErrors(); + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + assertFalse(d1.isDisposed()); + + assertTrue(d2.isDisposed()); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } finally { + RxJavaPlugins.reset(); + } + } + + public static void checkDisposed(Disposable d) { + assertFalse("Disposed upfront?!", d.isDisposed()); + + d.dispose(); + + assertTrue("Not disposed?!", d.isDisposed()); + + d.dispose(); + + assertTrue("Not disposed again?!", d.isDisposed()); + } + + /** + * Checks if the upstream's Subscription sent through the onSubscribe reports + * isCancelled properly before and after calling dispose. + * @param the input value type + * @param source the source to test + */ + public static void checkDisposed(Flowable source) { + final TestSubscriber ts = new TestSubscriber<>(0L); + source.subscribe(new FlowableSubscriber() { + @Override + public void onSubscribe(Subscription s) { + ts.onSubscribe(new BooleanSubscription()); + + s.cancel(); + + s.cancel(); + } + + @Override + public void onNext(Object t) { + ts.onNext(t); + } + + @Override + public void onError(Throwable t) { + ts.onError(t); + } + + @Override + public void onComplete() { + ts.onComplete(); + } + }); + ts.assertEmpty(); + } + /** + * Checks if the upstream's Disposable sent through the onSubscribe reports + * isDisposed properly before and after calling dispose. + * @param source the source to test + */ + public static void checkDisposed(Maybe source) { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + source.subscribe(new MaybeObserver() { + + @Override + public void onSubscribe(Disposable d) { + try { + b[0] = d.isDisposed(); + + d.dispose(); + + b[1] = d.isDisposed(); + + d.dispose(); + } finally { + cdl.countDown(); + } + } + + @Override + public void onSuccess(Object value) { + // ignored + } + + @Override + public void onError(Throwable e) { + // ignored + } + + @Override + public void onComplete() { + // ignored + } + }); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("Reports disposed upfront?", false, b[0]); + assertEquals("Didn't report disposed after?", true, b[1]); + } + + /** + * Checks if the upstream's Disposable sent through the onSubscribe reports + * isDisposed properly before and after calling dispose. + * @param source the source to test + */ + public static void checkDisposed(Observable source) { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + source.subscribe(new Observer() { + + @Override + public void onSubscribe(Disposable d) { + try { + b[0] = d.isDisposed(); + + d.dispose(); + + b[1] = d.isDisposed(); + + d.dispose(); + } finally { + cdl.countDown(); + } + } + + @Override + public void onNext(Object value) { + // ignored + } + + @Override + public void onError(Throwable e) { + // ignored + } + + @Override + public void onComplete() { + // ignored + } + }); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("Reports disposed upfront?", false, b[0]); + assertEquals("Didn't report disposed after?", true, b[1]); + } + + /** + * Checks if the upstream's Disposable sent through the onSubscribe reports + * isDisposed properly before and after calling dispose. + * @param source the source to test + */ + public static void checkDisposed(Single source) { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + source.subscribe(new SingleObserver() { + + @Override + public void onSubscribe(Disposable d) { + try { + b[0] = d.isDisposed(); + + d.dispose(); + + b[1] = d.isDisposed(); + + d.dispose(); + } finally { + cdl.countDown(); + } + } + + @Override + public void onSuccess(Object value) { + // ignored + } + + @Override + public void onError(Throwable e) { + // ignored + } + }); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("Reports disposed upfront?", false, b[0]); + assertEquals("Didn't report disposed after?", true, b[1]); + } + + /** + * Checks if the upstream's Disposable sent through the onSubscribe reports + * isDisposed properly before and after calling dispose. + * @param source the source to test + */ + public static void checkDisposed(Completable source) { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + source.subscribe(new CompletableObserver() { + + @Override + public void onSubscribe(Disposable d) { + try { + b[0] = d.isDisposed(); + + d.dispose(); + + b[1] = d.isDisposed(); + + d.dispose(); + } finally { + cdl.countDown(); + } + } + + @Override + public void onError(Throwable e) { + // ignored + } + + @Override + public void onComplete() { + // ignored + } + }); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("Reports disposed upfront?", false, b[0]); + assertEquals("Didn't report disposed after?", true, b[1]); + } + + /** + * Consumer for all base reactive types. + */ + enum NoOpConsumer implements FlowableSubscriber, Observer, MaybeObserver, SingleObserver, CompletableObserver { + INSTANCE; + + @Override + public void onSubscribe(Disposable d) { + // deliberately no-op + } + + @Override + public void onSuccess(Object value) { + // deliberately no-op + } + + @Override + public void onError(Throwable e) { + // deliberately no-op + } + + @Override + public void onComplete() { + // deliberately no-op + } + + @Override + public void onSubscribe(Subscription s) { + // deliberately no-op + } + + @Override + public void onNext(Object t) { + // deliberately no-op + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeMaybe(Function, ? extends MaybeSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Maybe source = new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + MaybeSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeMaybeToSingle(Function, ? extends SingleSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Maybe source = new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + SingleSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeMaybeToObservable(Function, ? extends ObservableSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Maybe source = new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + ObservableSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeMaybeToFlowable(Function, ? extends Publisher> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Maybe source = new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + Publisher out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeSingleToMaybe(Function, ? extends MaybeSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Single source = new Single() { + @Override + protected void subscribeActual(SingleObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + MaybeSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeSingleToObservable(Function, ? extends ObservableSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Single source = new Single() { + @Override + protected void subscribeActual(SingleObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + ObservableSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeSingleToFlowable(Function, ? extends Publisher> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Single source = new Single() { + @Override + protected void subscribeActual(SingleObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + Publisher out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeMaybeToCompletable(Function, ? extends CompletableSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Maybe source = new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + CompletableSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeSingle(Function, ? extends SingleSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Single source = new Single() { + @Override + protected void subscribeActual(SingleObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + SingleSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeFlowable(Function, ? extends Publisher> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Flowable source = new Flowable() { + @Override + protected void subscribeActual(Subscriber subscriber) { + try { + BooleanSubscription bs1 = new BooleanSubscription(); + + subscriber.onSubscribe(bs1); + + BooleanSubscription bs2 = new BooleanSubscription(); + + subscriber.onSubscribe(bs2); + + b[0] = bs1.isCancelled(); + b[1] = bs2.isCancelled(); + } finally { + cdl.countDown(); + } + } + }; + + Publisher out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Subscription already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param transform the transform to drive an operator + */ + @SuppressWarnings("unchecked") + public static void checkDoubleOnSubscribeParallel(Function, ? extends ParallelFlowable> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null, null, null }; + final CountDownLatch cdl = new CountDownLatch(2); + + ParallelFlowable source = new ParallelFlowable() { + @Override + public void subscribe(Subscriber[] subscribers) { + for (int i = 0; i < subscribers.length; i++) { + try { + BooleanSubscription bs1 = new BooleanSubscription(); + + subscribers[i].onSubscribe(bs1); + + BooleanSubscription bs2 = new BooleanSubscription(); + + subscribers[i].onSubscribe(bs2); + + b[i * 2 + 0] = bs1.isCancelled(); + b[i * 2 + 1] = bs2.isCancelled(); + } finally { + cdl.countDown(); + } + } + } + + @Override + public int parallelism() { + return 2; + } + }; + + ParallelFlowable out = transform.apply(source); + + out.subscribe(new Subscriber[] { NoOpConsumer.INSTANCE, NoOpConsumer.INSTANCE }); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("Rail 1 First disposed?", false, b[0]); + assertEquals("Rail 1 Second not disposed?", true, b[1]); + + assertEquals("Rail 2 First disposed?", false, b[2]); + assertEquals("Rail 2 Second not disposed?", true, b[3]); + + assertError(errors, 0, IllegalStateException.class, "Subscription already set!"); + assertError(errors, 1, IllegalStateException.class, "Subscription already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeParallelToFlowable(Function, ? extends Flowable> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null, null, null }; + final CountDownLatch cdl = new CountDownLatch(2); + + ParallelFlowable source = new ParallelFlowable() { + @Override + public void subscribe(Subscriber[] subscribers) { + for (int i = 0; i < subscribers.length; i++) { + try { + BooleanSubscription bs1 = new BooleanSubscription(); + + subscribers[i].onSubscribe(bs1); + + BooleanSubscription bs2 = new BooleanSubscription(); + + subscribers[i].onSubscribe(bs2); + + b[i * 2 + 0] = bs1.isCancelled(); + b[i * 2 + 1] = bs2.isCancelled(); + } finally { + cdl.countDown(); + } + } + } + + @Override + public int parallelism() { + return 2; + } + }; + + Flowable out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("Rail 1 First disposed?", false, b[0]); + assertEquals("Rail 1 Second not disposed?", true, b[1]); + + assertEquals("Rail 2 First disposed?", false, b[2]); + assertEquals("Rail 2 Second not disposed?", true, b[3]); + + assertError(errors, 0, IllegalStateException.class, "Subscription already set!"); + assertError(errors, 1, IllegalStateException.class, "Subscription already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeObservable(Function, ? extends ObservableSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Observable source = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + ObservableSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeObservableToSingle(Function, ? extends SingleSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Observable source = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + SingleSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeObservableToMaybe(Function, ? extends MaybeSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Observable source = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + MaybeSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeObservableToCompletable(Function, ? extends CompletableSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Observable source = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + CompletableSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeFlowableToObservable(Function, ? extends ObservableSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Flowable source = new Flowable() { + @Override + protected void subscribeActual(Subscriber subscriber) { + try { + BooleanSubscription bs1 = new BooleanSubscription(); + + subscriber.onSubscribe(bs1); + + BooleanSubscription bs2 = new BooleanSubscription(); + + subscriber.onSubscribe(bs2); + + b[0] = bs1.isCancelled(); + b[1] = bs2.isCancelled(); + } finally { + cdl.countDown(); + } + } + }; + + ObservableSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First cancelled?", false, b[0]); + assertEquals("Second not cancelled?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Subscription already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeFlowableToSingle(Function, ? extends SingleSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Flowable source = new Flowable() { + @Override + protected void subscribeActual(Subscriber subscriber) { + try { + BooleanSubscription bs1 = new BooleanSubscription(); + + subscriber.onSubscribe(bs1); + + BooleanSubscription bs2 = new BooleanSubscription(); + + subscriber.onSubscribe(bs2); + + b[0] = bs1.isCancelled(); + b[1] = bs2.isCancelled(); + } finally { + cdl.countDown(); + } + } + }; + + SingleSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First cancelled?", false, b[0]); + assertEquals("Second not cancelled?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Subscription already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeFlowableToMaybe(Function, ? extends MaybeSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Flowable source = new Flowable() { + @Override + protected void subscribeActual(Subscriber subscriber) { + try { + BooleanSubscription bs1 = new BooleanSubscription(); + + subscriber.onSubscribe(bs1); + + BooleanSubscription bs2 = new BooleanSubscription(); + + subscriber.onSubscribe(bs2); + + b[0] = bs1.isCancelled(); + b[1] = bs2.isCancelled(); + } finally { + cdl.countDown(); + } + } + }; + + MaybeSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First cancelled?", false, b[0]); + assertEquals("Second not cancelled?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Subscription already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeFlowableToCompletable(Function, ? extends Completable> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Flowable source = new Flowable() { + @Override + protected void subscribeActual(Subscriber subscriber) { + try { + BooleanSubscription bs1 = new BooleanSubscription(); + + subscriber.onSubscribe(bs1); + + BooleanSubscription bs2 = new BooleanSubscription(); + + subscriber.onSubscribe(bs2); + + b[0] = bs1.isCancelled(); + b[1] = bs2.isCancelled(); + } finally { + cdl.countDown(); + } + } + }; + + Completable out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First cancelled?", false, b[0]); + assertEquals("Second not cancelled?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Subscription already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeCompletable(Function transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Completable source = new Completable() { + @Override + protected void subscribeActual(CompletableObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + CompletableSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the output value tye + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeCompletableToMaybe(Function> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Completable source = new Completable() { + @Override + protected void subscribeActual(CompletableObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + MaybeSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the output value tye + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeCompletableToSingle(Function> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Completable source = new Completable() { + @Override + protected void subscribeActual(CompletableObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + SingleSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeCompletableToFlowable(Function> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Completable source = new Completable() { + @Override + protected void subscribeActual(CompletableObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + Publisher out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeCompletableToObservable(Function> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Completable source = new Completable() { + @Override + protected void subscribeActual(CompletableObserver observer) { + try { + Disposable d1 = Disposable.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposable.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + ObservableSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the operator applied to a Maybe source propagates dispose properly. + * @param the source value type + * @param the output value type + * @param composer the function to apply an operator to the provided Maybe source + */ + public static void checkDisposedMaybe(Function, ? extends MaybeSource> composer) { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = new TestSubscriber<>(); + + try { + new MaybeToFlowable<>(composer.apply(pp.singleElement())).subscribe(ts); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertTrue("Not subscribed to source!", pp.hasSubscribers()); + + ts.cancel(); + + assertFalse("Dispose not propagated!", pp.hasSubscribers()); + } + + /** + * Check if the operator applied to a Completable source propagates dispose properly. + * @param composer the function to apply an operator to the provided Completable source + */ + public static void checkDisposedCompletable(Function composer) { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = new TestSubscriber<>(); + + try { + new CompletableToFlowable(composer.apply(pp.ignoreElements())).subscribe(ts); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertTrue("Not subscribed to source!", pp.hasSubscribers()); + + ts.cancel(); + + assertFalse("Dispose not propagated!", pp.hasSubscribers()); + } + + /** + * Check if the operator applied to a Maybe source propagates dispose properly. + * @param the source value type + * @param the output value type + * @param composer the function to apply an operator to the provided Maybe source + */ + public static void checkDisposedMaybeToSingle(Function, ? extends SingleSource> composer) { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = new TestSubscriber<>(); + + try { + new SingleToFlowable<>(composer.apply(pp.singleElement())).subscribe(ts); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertTrue(pp.hasSubscribers()); + + ts.cancel(); + + assertFalse(pp.hasSubscribers()); + } + + /** + * Check if the operator applied to a Maybe source propagates dispose properly. + * @param the source value type + * @param the output value type + * @param composer the function to apply an operator to the provided Maybe source + */ + public static void checkDisposedSingleToMaybe(Function, ? extends MaybeSource> composer) { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = new TestSubscriber<>(); + + try { + new MaybeToFlowable<>(composer.apply(pp.singleOrError())).subscribe(ts); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertTrue(pp.hasSubscribers()); + + ts.cancel(); + + assertFalse(pp.hasSubscribers()); + } + + /** + * Emit the given values and complete the Processor. + * @param the value type + * @param p the target processor + * @param values the values to emit + */ + @SafeVarargs + public static void emit(Processor p, T... values) { + for (T v : values) { + p.onNext(v); + } + p.onComplete(); + } + + /** + * Emit the given values and complete the Subject. + * @param the value type + * @param p the target subject + * @param values the values to emit + */ + @SafeVarargs + public static void emit(Subject p, T... values) { + for (T v : values) { + p.onNext(v); + } + p.onComplete(); + } + + /** + * Checks if the source is fuseable and its isEmpty/clear works properly. + * @param the value type + * @param source the source sequence + */ + public static void checkFusedIsEmptyClear(Observable source) { + final CountDownLatch cdl = new CountDownLatch(1); + + final Boolean[] state = { null, null, null, null }; + + source.subscribe(new Observer() { + @Override + public void onSubscribe(Disposable d) { + try { + if (d instanceof QueueDisposable) { + @SuppressWarnings("unchecked") + QueueDisposable qd = (QueueDisposable) d; + state[0] = true; + + int m = qd.requestFusion(QueueFuseable.ANY); + + if (m != QueueFuseable.NONE) { + state[1] = true; + + state[2] = qd.isEmpty(); + + qd.clear(); + + state[3] = qd.isEmpty(); + } + } + cdl.countDown(); + } finally { + d.dispose(); + } + } + + @Override + public void onNext(T value) { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + + } + }); + + try { + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + + assertTrue("Not fuseable", state[0]); + assertTrue("Fusion rejected", state[1]); + + assertNotNull(state[2]); + assertTrue("Did not empty", state[3]); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + /** + * Checks if the source is fuseable and its isEmpty/clear works properly. + * @param the value type + * @param source the source sequence + */ + public static void checkFusedIsEmptyClear(Flowable source) { + final CountDownLatch cdl = new CountDownLatch(1); + + final Boolean[] state = { null, null, null, null }; + + source.subscribe(new FlowableSubscriber() { + @Override + public void onSubscribe(Subscription s) { + try { + if (s instanceof QueueSubscription) { + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription) s; + state[0] = true; + + int m = qs.requestFusion(QueueFuseable.ANY); + + if (m != QueueFuseable.NONE) { + state[1] = true; + + state[2] = qs.isEmpty(); + + qs.clear(); + + state[3] = qs.isEmpty(); + } + } + cdl.countDown(); + } finally { + s.cancel(); + } + } + + @Override + public void onNext(T value) { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + + } + }); + + try { + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + + assertTrue("Not fuseable", state[0]); + assertTrue("Fusion rejected", state[1]); + + assertNotNull(state[2]); + assertTrue("Did not empty", state[3]); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + public static void checkInvalidParallelSubscribers(ParallelFlowable source) { + int n = source.parallelism(); + + @SuppressWarnings("unchecked") + TestSubscriber[] tss = new TestSubscriber[n + 1]; + for (int i = 0; i <= n; i++) { + tss[i] = new TestSubscriber<>().withTag("" + i); + } + + source.subscribe(tss); + + for (int i = 0; i <= n; i++) { + tss[i].assertFailure(IllegalArgumentException.class); + } + } + + public static Observable rejectObservableFusion() { + return new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(new QueueDisposable() { + + @Override + public int requestFusion(int mode) { + return 0; + } + + @Override + public boolean offer(T value) { + throw new IllegalStateException(); + } + + @Override + public boolean offer(T v1, T v2) { + throw new IllegalStateException(); + } + + @Override + public T poll() throws Exception { + return null; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public void clear() { + } + + @Override + public void dispose() { + } + + @Override + public boolean isDisposed() { + return false; + } + }); + } + }; + } + + public static Flowable rejectFlowableFusion() { + return new Flowable() { + @Override + protected void subscribeActual(Subscriber subscriber) { + subscriber.onSubscribe(new QueueSubscription() { + + @Override + public int requestFusion(int mode) { + return 0; + } + + @Override + public boolean offer(T value) { + throw new IllegalStateException(); + } + + @Override + public boolean offer(T v1, T v2) { + throw new IllegalStateException(); + } + + @Override + public T poll() throws Exception { + return null; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public void clear() { + } + + @Override + public void cancel() { + } + + @Override + public void request(long n) { + } + }); + } + }; + } + + static final class FlowableStripBoundary extends Flowable implements FlowableTransformer { + + final Flowable source; + + FlowableStripBoundary(Flowable source) { + this.source = source; + } + + @Override + public Flowable apply(Flowable upstream) { + return new FlowableStripBoundary<>(upstream); + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new StripBoundarySubscriber<>(s)); + } + + static final class StripBoundarySubscriber implements FlowableSubscriber, QueueSubscription { + + final Subscriber downstream; + + Subscription upstream; + + QueueSubscription qs; + + StripBoundarySubscriber(Subscriber downstream) { + this.downstream = downstream; + } + + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(Subscription subscription) { + this.upstream = subscription; + if (subscription instanceof QueueSubscription) { + qs = (QueueSubscription)subscription; + } + downstream.onSubscribe(this); + } + + @Override + public void onNext(T t) { + downstream.onNext(t); + } + + @Override + public void onError(Throwable throwable) { + downstream.onError(throwable); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public int requestFusion(int mode) { + QueueSubscription fs = qs; + if (fs != null) { + return fs.requestFusion(mode & ~BOUNDARY); + } + return NONE; + } + + @Override + public boolean offer(T value) { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override + public boolean offer(T v1, T v2) { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override + public T poll() throws Throwable { + return qs.poll(); + } + + @Override + public void clear() { + qs.clear(); + } + + @Override + public boolean isEmpty() { + return qs.isEmpty(); + } + + @Override + public void request(long n) { + upstream.request(n); + } + + @Override + public void cancel() { + upstream.cancel(); + } + } + } + + /** + * Strips the {@link QueueFuseable#BOUNDARY} mode flag when the downstream calls {@link QueueSubscription#requestFusion(int)}. + *

+ * By default, many operators use {@link QueueFuseable#BOUNDARY} to indicate upstream side-effects + * should not leak over a fused boundary. However, some tests want to verify if {@link QueueSubscription#poll()} crashes + * are handled correctly and the most convenient way is to crash {@link Flowable#map} that won't fuse with {@code BOUNDARY} + * flag. This transformer strips this flag and thus allows the function of {@code map} to be executed as part of the + * {@code poll()} chain. + * @param the element type of the flow + * @return the new Transformer instance + */ + public static FlowableTransformer flowableStripBoundary() { + return new FlowableStripBoundary<>(null); + } + + static final class ObservableStripBoundary extends Observable implements ObservableTransformer { + + final Observable source; + + ObservableStripBoundary(Observable source) { + this.source = source; + } + + @Override + public Observable apply(Observable upstream) { + return new ObservableStripBoundary<>(upstream); + } + + @Override + protected void subscribeActual(Observer observer) { + source.subscribe(new StripBoundaryObserver<>(observer)); + } + + static final class StripBoundaryObserver implements Observer, QueueDisposable { + + final Observer downstream; + + Disposable upstream; + + QueueDisposable qd; + + StripBoundaryObserver(Observer downstream) { + this.downstream = downstream; + } + + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(Disposable d) { + this.upstream = d; + if (d instanceof QueueDisposable) { + qd = (QueueDisposable)d; + } + downstream.onSubscribe(this); + } + + @Override + public void onNext(T t) { + downstream.onNext(t); + } + + @Override + public void onError(Throwable throwable) { + downstream.onError(throwable); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public int requestFusion(int mode) { + QueueDisposable fs = qd; + if (fs != null) { + return fs.requestFusion(mode & ~BOUNDARY); + } + return NONE; + } + + @Override + public boolean offer(T value) { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override + public boolean offer(T v1, T v2) { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override + public T poll() throws Throwable { + return qd.poll(); + } + + @Override + public void clear() { + qd.clear(); + } + + @Override + public boolean isEmpty() { + return qd.isEmpty(); + } + + @Override + public void dispose() { + upstream.dispose(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + } + } + + /** + * Strips the {@link QueueFuseable#BOUNDARY} mode flag when the downstream calls {@link QueueDisposable#requestFusion(int)}. + *

+ * By default, many operators use {@link QueueFuseable#BOUNDARY} to indicate upstream side-effects + * should not leak over a fused boundary. However, some tests want to verify if {@link QueueDisposable#poll()} crashes + * are handled correctly and the most convenient way is to crash {@link Observable#map} that won't fuse with {@code BOUNDARY} + * flag. This transformer strips this flag and thus allows the function of {@code map} to be executed as part of the + * {@code poll()} chain. + * @param the element type of the flow + * @return the new Transformer instance + */ + public static ObservableTransformer observableStripBoundary() { + return new ObservableStripBoundary<>(null); + } + + /** + * Given a base reactive type name, try to find its source in the current runtime + * path and return a file to it or null if not found. + * @param baseClassName the class name such as {@code Maybe} + * @return the File pointing to the source + * @throws Exception on error + */ + public static File findSource(String baseClassName) throws Exception { + return findSource(baseClassName, "io.reactivex.rxjava3.core"); + } + + /** + * Given a base reactive type name, try to find its source in the current runtime + * path and return a file to it or null if not found. + * @param baseClassName the class name such as {@code Maybe} + * @param parentPackage the parent package such as {@code io.reactivex.rxjava3.core} + * @return the File pointing to the source + * @throws Exception on error + */ + public static File findSource(String baseClassName, String parentPackage) throws Exception { + URL u = TestHelper.class.getResource(TestHelper.class.getSimpleName() + ".class"); + + String path = new File(u.toURI()).toString().replace('\\', '/'); + + parentPackage = parentPackage.replace(".", "/"); +// System.out.println(path); + + int i = path.toLowerCase().indexOf("/rxjava"); + if (i < 0) { + System.out.println("Can't find the base RxJava directory"); + return null; + } + + // find end of any potential postfix to /RxJava + int j = path.indexOf("/", i + 6); + + String basePackage = path.substring(0, j + 1) + "src/main/java"; + + String p = basePackage + "/" + parentPackage + "/" + baseClassName + ".java"; + + File f = new File(p); + + if (f.canRead()) { + return f; + } + + System.out.println("Can't read " + p); + return null; + } + + /** + * Repeatedly calls System.gc() and sleeps until the current memory usage + * is less than the given expected usage or the given number of wait loop/time + * has passed. + * @param oneSleep how many milliseconds to sleep after a GC. + * @param maxLoop the maximum number of GC/sleep calls. + * @param expectedMemoryUsage the memory usage in bytes at max + * @return the actual memory usage after the loop + * @throws InterruptedException if the sleep is interrupted + */ + public static long awaitGC(long oneSleep, int maxLoop, long expectedMemoryUsage) throws InterruptedException { + MemoryMXBean bean = ManagementFactory.getMemoryMXBean(); + + System.gc(); + + int i = maxLoop; + while (i-- != 0) { + long usage = bean.getHeapMemoryUsage().getUsed(); + if (usage <= expectedMemoryUsage) { + return usage; + } + System.gc(); + Thread.sleep(oneSleep); + } + return bean.getHeapMemoryUsage().getUsed(); + } + + /** + * Enable thracking of the global errors for the duration of the action. + * @param action the action to run with a list of errors encountered + * @throws Throwable the exception rethrown from the action + */ + public static void withErrorTracking(Consumer> action) throws Throwable { + List errors = trackPluginErrors(); + try { + action.accept(errors); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Assert if the given CompletableFuture fails with a specified error inside an ExecutionException. + * @param cf the CompletableFuture to test + * @param error the error class expected + */ + public static void assertError(CompletableFuture cf, Class error) { + try { + cf.get(); + fail("Should have thrown!"); + } catch (Throwable ex) { + if (!error.isInstance(ex.getCause())) { + ex.printStackTrace(); + fail("Wrong cause: " + ex.getCause()); + } + } + } + + /** + * Syncs the execution of the given runnable with the execution of the + * current thread. + * @param run the other task to run in sync with the current thread + * @param resume the latch to count down after the {@code run} + */ + public static void raceOther(Runnable run, CountDownLatch resume) { + AtomicInteger sync = new AtomicInteger(2); + + Schedulers.single().scheduleDirect(() -> { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + + run.run(); + + resume.countDown(); + }); + + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + } + + /** + * Inserts a ConditionalSubscriber into the chain to trigger the conditional paths + * without interfering with the requestFusion parts. + * @param the element type + * @return the new FlowableTransformer instance + */ + public static FlowableTransformer conditional() { + return f -> new Flowable() { + @Override + protected void subscribeActual(@NonNull Subscriber<@NonNull ? super T> subscriber) { + f.subscribe(new ForwardingConditionalSubscriber<>(subscriber)); + } + }; + } + + /** + * Wraps a Subscriber and exposes it as a fuseable conditional subscriber without interfering with + * requestFusion. + * @param the element type + */ + static final class ForwardingConditionalSubscriber extends BasicQueueSubscription implements ConditionalSubscriber { + + private static final long serialVersionUID = 365317603608134078L; + + final Subscriber downstream; + + Subscription upstream; + + QueueSubscription qs; + + ForwardingConditionalSubscriber(Subscriber downstream) { + this.downstream = downstream; + } + + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(@NonNull Subscription s) { + this.upstream = s; + if (s instanceof QueueSubscription) { + this.qs = (QueueSubscription)s; + } + downstream.onSubscribe(this); + } + + @Override + public void onNext(@NonNull T t) { + downstream.onNext(t); + } + + @Override + public boolean tryOnNext(@NonNull T t) { + downstream.onNext(t); + return true; + } + + @Override + public void onError(Throwable t) { + downstream.onError(t); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public int requestFusion(int mode) { + return qs != null ? qs.requestFusion(mode) : 0; + } + + @Override + public @Nullable T poll() throws Throwable { + return qs.poll(); + } + + @Override + public boolean isEmpty() { + return qs.isEmpty(); + } + + @Override + public void clear() { + qs.clear(); + } + + @Override + public void request(long n) { + upstream.request(n); + } + + @Override + public void cancel() { + upstream.cancel(); + } + } +} \ No newline at end of file