diff --git a/rxjava-reactive-streams/build.gradle b/rxjava-reactive-streams/build.gradle index ba53f61..a2b57aa 100644 --- a/rxjava-reactive-streams/build.gradle +++ b/rxjava-reactive-streams/build.gradle @@ -3,7 +3,7 @@ description = "Adapter between RxJava and ReactiveStreams" apply plugin: 'java' dependencies { - compile 'io.reactivex:rxjava:1.1.6' + compile 'io.reactivex:rxjava:1.1.8' compile 'org.reactivestreams:reactive-streams:1.0.0' testCompile 'org.reactivestreams:reactive-streams-tck:1.0.0' } diff --git a/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/SubscriberAdapter.java b/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/SubscriberAdapter.java index 7337373..f7307c1 100644 --- a/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/SubscriberAdapter.java +++ b/rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/SubscriberAdapter.java @@ -37,7 +37,6 @@ public void onSubscribe(final Subscription rsSubscription) { if (started.compareAndSet(false, true)) { RxJavaSynchronizedProducer sp = new RxJavaSynchronizedProducer(rsSubscription); rxSubscriber.add(sp); - rxSubscriber.onStart(); rxSubscriber.setProducer(sp); } else { rsSubscription.cancel(); diff --git a/rxjava-reactive-streams/src/test/java/rx/reactivestreams/TckSubscriberBlackboxTest.java b/rxjava-reactive-streams/src/test/java/rx/reactivestreams/TckSubscriberBlackboxTest.java index d6e73b5..50d6d2d 100644 --- a/rxjava-reactive-streams/src/test/java/rx/reactivestreams/TckSubscriberBlackboxTest.java +++ b/rxjava-reactive-streams/src/test/java/rx/reactivestreams/TckSubscriberBlackboxTest.java @@ -37,7 +37,7 @@ public TckSubscriberBlackboxTest() { @Override public Subscriber createSubscriber() { - return new SubscriberAdapter(new rx.Subscriber() { + rx.Subscriber rxSubscriber = new rx.Subscriber() { @Override public void onStart() { @@ -58,7 +58,9 @@ public void onError(Throwable e) { public void onNext(Long aLong) { request(1); } - }); + }; + rxSubscriber.onStart(); // Observable.subscribe() calls this automatically + return new SubscriberAdapter(rxSubscriber); } @Override diff --git a/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/WrapUnwrap.java b/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/WrapUnwrap.java new file mode 100644 index 0000000..e9c1c8c --- /dev/null +++ b/rxjava-reactive-streams/src/test/java/rx/reactivestreams/test/WrapUnwrap.java @@ -0,0 +1,46 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.reactivestreams.test; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import rx.*; +import rx.functions.Func1; + +public class WrapUnwrap { + + @Test + public void wrapUnwrap() { + + Observable o = Observable.range(1, 350); + + Observable> p = Observable.just( + RxReactiveStreams.toPublisher(o)).asObservable(); + + for (int u : p.flatMap(new Func1, Observable>() { + @Override + public Observable call(Publisher v) { + return RxReactiveStreams.toObservable(v); + } + }) + .toBlocking() + .toIterable()) { + System.out.println(u); + } + + } +}