Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch command test #91

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions src/com/nutrons/framework/commands/Command.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package com.nutrons.framework.commands;

import static com.nutrons.framework.util.FlowOperators.toFlow;

import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Publisher;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static com.nutrons.framework.util.FlowOperators.toFlow;
import org.reactivestreams.Publisher;

public class Command implements CommandWorkUnit {

Expand Down Expand Up @@ -79,15 +78,18 @@ public static Command parallel(Command... commands) {
*/
public static Command fromSwitch(Publisher<? extends CommandWorkUnit> commandStream) {
return new Command(x -> Flowable.defer(() ->
Flowable.switchOnNext(Flowable.fromPublisher(commandStream).map(y -> y.execute(x))
.subscribeOn(Schedulers.io()))).scan((a, b) -> {
a.run();
return b;
}));
Flowable.fromPublisher(commandStream)
.concatMap(y -> Flowable.<Terminator>just(FlattenedTerminator.from(y.execute(x)))
.subscribeOn(Schedulers.io()))
.scan((a, b) -> {
a.run();
return b;
})));
}

public Command addFinalTerminator(Terminator terminator) {
return Command.just(x -> this.source.execute(x).flatMap(y -> Flowable.<Terminator>just(y, terminator)).subscribeOn(Schedulers.io()));
return Command.just(x -> this.source.execute(x)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need on backpressure drop here?

.flatMap(y -> Flowable.<Terminator>just(y, terminator)).subscribeOn(Schedulers.io()));
}

/**
Expand Down Expand Up @@ -125,7 +127,7 @@ public Command terminable(Publisher<?> terminator) {
*/
public Command endsWhen(Publisher<?> terminator, boolean terminatesAtEnd) {
return Command.just(x -> {
Flowable<Terminator> sourceTerminator = this.execute(terminatesAtEnd);
Flowable<? extends Terminator> sourceTerminator = this.execute(terminatesAtEnd);
Terminator multi = FlattenedTerminator.from(sourceTerminator);
return Flowable.defer(() -> Flowable.<Terminator>never().takeUntil(terminator)
.mergeWith(Flowable.just(multi::run)));
Expand All @@ -137,7 +139,8 @@ public Command endsWhen(Publisher<?> terminator, boolean terminatesAtEnd) {
* will only complete once endCondition returns true.
*/
public Command until(Supplier<Boolean> endCondition) {
ConnectableFlowable<?> terminator = emptyPulse.map(x -> endCondition.get()).filter(x -> x).onBackpressureDrop().publish();
ConnectableFlowable<?> terminator = emptyPulse.map(x -> endCondition.get()).filter(x -> x)
.onBackpressureDrop().publish();
terminator.connect();
return this.terminable(terminator);
}
Expand All @@ -163,16 +166,21 @@ public Command delayFinish(long delay, TimeUnit unit) {
return this.terminable(Flowable.timer(delay, unit));
}

/**
* End and terminate this command only after the specified time has passed.
*/
public Command killAfter(long delay, TimeUnit unit) {
return Command.just(x -> {
Flowable<Terminator> terms = this.terminable(Flowable.timer(delay, unit)).execute(x);
Flowable<? extends Terminator> terms = this.terminable(Flowable.timer(delay, unit))
.execute(x);
return terms;
});
}

@Override
public Flowable<Terminator> execute(boolean selfTerminating) {
Flowable<Terminator> terms = source.execute(selfTerminating).subscribeOn(Schedulers.io());
public Flowable<? extends Terminator> execute(boolean selfTerminating) {
Flowable<? extends Terminator> terms = source.execute(selfTerminating)
.subscribeOn(Schedulers.io());
if (selfTerminating) {
terms.toList().subscribe(x -> Observable.fromIterable(x).blockingSubscribe(Terminator::run));
}
Expand Down
2 changes: 1 addition & 1 deletion src/com/nutrons/framework/commands/CommandWorkUnit.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
import io.reactivex.Flowable;

public interface CommandWorkUnit {
Flowable<Terminator> execute(boolean selfTerminating);
Flowable<? extends Terminator> execute(boolean selfTerminating);
}
6 changes: 3 additions & 3 deletions src/com/nutrons/framework/commands/FlattenedTerminator.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ class FlattenedTerminator implements Terminator {

private final AtomicBoolean lock;
private final ArrayList<Terminator> terminators;
private final Flowable<Terminator> terminatorStream;
private final Flowable<? extends Terminator> terminatorStream;

private FlattenedTerminator(Flowable<Terminator> terminators) {
private FlattenedTerminator(Flowable<? extends Terminator> terminators) {
this.lock = new AtomicBoolean(false);
this.terminators = new ArrayList<>();
this.terminatorStream = terminators;
Expand All @@ -29,7 +29,7 @@ private FlattenedTerminator(Flowable<Terminator> terminators) {
});
}

static FlattenedTerminator from(Flowable<Terminator> terminators) {
static FlattenedTerminator from(Flowable<? extends Terminator> terminators) {
return new FlattenedTerminator(terminators);
}

Expand Down
30 changes: 4 additions & 26 deletions test/com/nutrons/framework/test/MultiCommandTest.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package com.nutrons.framework.test;

import static com.nutrons.framework.commands.Command.parallel;
import static junit.framework.TestCase.assertTrue;

import com.nutrons.framework.commands.Command;
import com.nutrons.framework.commands.TerminatorWrapper;
import io.reactivex.Flowable;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static com.nutrons.framework.commands.Command.parallel;
import static junit.framework.TestCase.assertTrue;

public class MultiCommandTest {
private Command delay;

Expand Down Expand Up @@ -41,24 +39,4 @@ public void testOneThenAnother() throws InterruptedException {
Thread.sleep(2000);
assertTrue(record[0] == 0);
}

@Test
public void testSwitch() throws InterruptedException {
int[] record = new int[1];
record[0] = 0;
Command inc = Command.just(x -> {
synchronized (record) {
record[0] += 1;
}
return Flowable.just(new TerminatorWrapper(() -> {
synchronized (record) {
record[0] -= 1;
}
}));
});
Command.fromSwitch(Flowable.interval(1, TimeUnit.SECONDS).map(x -> inc).take(5))
.execute(true).blockingSubscribe();
Thread.sleep(2000);
assertTrue(record[0] == 0);
}
}
20 changes: 10 additions & 10 deletions test/com/nutrons/framework/test/TestCommand.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package com.nutrons.framework.test;

import static com.nutrons.framework.commands.Command.parallel;
import static com.nutrons.framework.commands.Command.serial;
import static junit.framework.TestCase.assertTrue;

import com.nutrons.framework.commands.Command;
import com.nutrons.framework.commands.Terminator;
import io.reactivex.Flowable;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static com.nutrons.framework.commands.Command.parallel;
import static com.nutrons.framework.commands.Command.serial;
import static junit.framework.TestCase.assertTrue;

public class TestCommand {

private Command delay;

static void waitForCommand(Flowable<Terminator> commandExecution) {
static void waitForCommand(Flowable<? extends Terminator> commandExecution) {
commandExecution.blockingSubscribe();
}

Expand Down Expand Up @@ -72,7 +71,7 @@ public void inParallelTimed() {
public void testTerminable() throws InterruptedException {
final long start = System.currentTimeMillis();
PublishProcessor pp = PublishProcessor.create();
Flowable<Terminator> td = serial(delay, delay, delay, delay)
final Flowable<? extends Terminator> td = serial(delay, delay, delay, delay)
.terminable(pp).execute(true);
Thread.sleep(3000);
pp.onNext(new Object());
Expand All @@ -86,7 +85,8 @@ public void testUntil() throws InterruptedException {
int[] record = new int[2];
assertTrue(record[0] == 0);
long start = System.currentTimeMillis();
Flowable<Terminator> td = Command.fromAction(() -> record[0] = 1).until(() -> record[1] == 1).execute(true);
Flowable<? extends Terminator> td = Command.fromAction(() -> record[0] = 1)
.until(() -> record[1] == 1).execute(true);
Flowable.timer(1, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).subscribe(x -> record[1] = 1);
waitForCommand(td);
assertTrue(System.currentTimeMillis() - 1000 > start);
Expand All @@ -109,7 +109,7 @@ public void testStartable() {
public void testWhen() throws InterruptedException {
int[] record = new int[2];
assertTrue(record[0] == 0);
final Flowable<Terminator> td = Command.fromAction(() -> record[0] = 1)
final Flowable<? extends Terminator> td = Command.fromAction(() -> record[0] = 1)
.when(() -> record[1] == 1)
.execute(true);
Thread.sleep(1000);
Expand Down
94 changes: 94 additions & 0 deletions test/com/nutrons/framework/test/TestSwitchCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.nutrons.framework.test;

import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertTrue;

import com.nutrons.framework.commands.Command;
import com.nutrons.framework.commands.TerminatorWrapper;
import io.reactivex.Flowable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

public class TestSwitchCommand {

@Test
public void testSwitchStart() throws InterruptedException {
List<Long> list = new ArrayList<>();
Flowable<Command> commandStream = Flowable.interval(100, TimeUnit.MILLISECONDS)
.map(x -> putNumber(list, x)).take(5);
Command.fromSwitch(commandStream).execute(true);
Thread.sleep(300);
assertFalse(list.contains(4));
Thread.sleep(500);
for (long i = 0; i < 5; i++) {
assertTrue(list.contains(i));
}
}

private Command putNumber(List<Long> list, long number) {
return Command.fromAction(() -> list.add(number));
}


@Test
public void testSwitchStartAndStop() throws InterruptedException {
int[] record = new int[1];
record[0] = 0;
Command inc = Command.just(x -> {
synchronized (record) {
record[0] += 1;
}
return Flowable.just(new TerminatorWrapper(() -> {
synchronized (record) {
record[0] -= 1;
}
}));
});
Command.fromSwitch(Flowable.interval(1, TimeUnit.SECONDS).map(x -> inc).take(5))
.execute(true).blockingSubscribe();
Thread.sleep(2000);
assertTrue(record[0] == 0);
}

@Test
public void testSwitchTerminateRealtime() throws InterruptedException {
int[] record = new int[1];
long start = System.currentTimeMillis();
record[0] = 0;
Command inc = Command.just(x -> Flowable.just(new TerminatorWrapper(() -> {
synchronized (record) {
record[0] += 1;
}
})));
Command.fromSwitch(Flowable.interval(1, TimeUnit.SECONDS).map(x -> inc).take(5))
.execute(true);
assertTrue(System.currentTimeMillis() - start < 1000);
Thread.sleep(2000);
assertTrue(record[0] < 5);
assertTrue(record[0] > 0);
}

@Test
public void testSwitchNotTerminating() throws InterruptedException {
Command doesntFinish = Command.just(x -> Flowable.just(() -> assertTrue(false)));
doesntFinish.execute(false);
Thread.sleep(1000);
Command justOne = Command.fromSwitch(Flowable.<Command>never()
.mergeWith(Flowable.just(doesntFinish)));
justOne.execute(false);
Thread.sleep(1000);
justOne.execute(true);
Thread.sleep(1000);
}

@Test(expected = RuntimeException.class)
public void testSwitchTerminatesOnNext() {
Command doesntFinish = Command.just(x -> Flowable.just(() -> {
throw new RuntimeException();
}));
Command two = Command.fromSwitch(Flowable.just(doesntFinish, doesntFinish));
two.execute(false).blockingSubscribe();
}
}