Skip to content

Commit

Permalink
Added pesist-test
Browse files Browse the repository at this point in the history
  • Loading branch information
t-novak committed Jan 24, 2018
1 parent 3bfb8c7 commit 3fe6816
Showing 1 changed file with 37 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -623,4 +623,41 @@ public void testWatermarkSchedulerWithLatecomers() throws InterruptedException,
1999, 1998, 1997, 1996, 1995);

}

@Test
public void persistTest() throws InterruptedException, ExecutionException {
Dataset<Integer> ints = flow.createInput(
ListDataSource.unbounded(
Arrays.asList(0, 1, 2, 3)));

// identity map
Dataset<Integer> output = FlatMap.of(ints)
.using((Integer e, Collector<Integer> c) -> c.collect(e))
.output();

// collector of outputs
ListDataSink<Integer> outputSink = ListDataSink.get();
output.persist(outputSink);
// FlatMap
// .of(output)
// .using((Integer elem, Collector<Integer> collector) -> collector.collect(elem))
// .output()
// .persist(outputSink);

// another sink to avoid NPE due to the unused dataset
ListDataSink<Integer> anotherSink = ListDataSink.get();
FlatMap
.of(output)
.using((Integer elem, Collector<Integer> collector) -> collector.collect(elem))
.output()
.persist(anotherSink);

executor.submit(flow).get();

List<Integer> outputs = outputSink.getOutputs();
DatasetAssert.unorderedEquals(
outputs,
0, 1, 2, 3);
}

}

0 comments on commit 3fe6816

Please sign in to comment.