Skip to content

Commit

Permalink
Fix SubscriberAdapter calling onStart; leads to double calls sometime…
Browse files Browse the repository at this point in the history
…s. (#152)
  • Loading branch information
akarnokd authored Jul 26, 2016
1 parent f648934 commit 3d7ec04
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 4 deletions.
2 changes: 1 addition & 1 deletion rxjava-reactive-streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ description = "Adapter between RxJava and ReactiveStreams"
apply plugin: 'java'

dependencies {
compile 'io.reactivex:rxjava:1.1.6'
compile 'io.reactivex:rxjava:1.1.8'
compile 'org.reactivestreams:reactive-streams:1.0.0'
testCompile 'org.reactivestreams:reactive-streams-tck:1.0.0'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public void onSubscribe(final Subscription rsSubscription) {
if (started.compareAndSet(false, true)) {
RxJavaSynchronizedProducer sp = new RxJavaSynchronizedProducer(rsSubscription);
rxSubscriber.add(sp);
rxSubscriber.onStart();
rxSubscriber.setProducer(sp);
} else {
rsSubscription.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public TckSubscriberBlackboxTest() {

@Override
public Subscriber<Long> createSubscriber() {
return new SubscriberAdapter<Long>(new rx.Subscriber<Long>() {
rx.Subscriber<Long> rxSubscriber = new rx.Subscriber<Long>() {

@Override
public void onStart() {
Expand All @@ -58,7 +58,9 @@ public void onError(Throwable e) {
public void onNext(Long aLong) {
request(1);
}
});
};
rxSubscriber.onStart(); // Observable.subscribe() calls this automatically
return new SubscriberAdapter<Long>(rxSubscriber);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.reactivestreams.test;

import org.reactivestreams.Publisher;
import org.testng.annotations.Test;

import rx.*;
import rx.functions.Func1;

public class WrapUnwrap {

@Test
public void wrapUnwrap() {

Observable<Integer> o = Observable.range(1, 350);

Observable<Publisher<Integer>> p = Observable.just(
RxReactiveStreams.toPublisher(o)).asObservable();

for (int u : p.flatMap(new Func1<Publisher<Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> call(Publisher<Integer> v) {
return RxReactiveStreams.toObservable(v);
}
})
.toBlocking()
.toIterable()) {
System.out.println(u);
}

}
}

0 comments on commit 3d7ec04

Please sign in to comment.