Skip to content

Commit

Permalink
Unify Collector factories used in tests (#949)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Sep 16, 2024
1 parent a0001e7 commit 9fbf2cd
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,12 @@ Stream<DynamicTest> shouldRejectInvalidParallelism() {
})));
}

protected record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> collector) {
protected record CollectorDefinition<T, R>(String name, Factory.CollectorFactoryWithParallelism<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, Factory.CollectorFactoryWithParallelism<T, R> collector) {
return new CollectorDefinition<>(name, collector);
}
}

@FunctionalInterface
private interface CollectorFactory<T, R> {
Collector<T, ?, List<R>> collector(Function<T, R> f, Integer p);
}

private static Executor e() {
return Executors.newCachedThreadPool();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static Stream<CollectorDefinition<Integer, Integer>> allOrdered() {
Stream<DynamicTest> shouldProcessEmpty() {
return all()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
assertThat(Stream.<Integer>empty().collect(c.collector().apply(i -> i))).isEmpty();
assertThat(Stream.<Integer>empty().collect(c.collector().collector(i -> i))).isEmpty();
}));
}

Expand All @@ -78,7 +78,7 @@ Stream<DynamicTest> shouldProcessAllElements() {
return all()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var list = IntStream.range(0, 100).boxed().toList();
List<Integer> result = list.stream().collect(c.collector().apply(i -> i));
List<Integer> result = list.stream().collect(c.collector().collector(i -> i));
assertThat(result).containsExactlyInAnyOrderElementsOf(list);
}));
}
Expand All @@ -88,7 +88,7 @@ Stream<DynamicTest> shouldProcessAllElementsInOrder() {
return allOrdered()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var list = IntStream.range(0, 100).boxed().toList();
List<Integer> result = list.stream().collect(c.collector().apply(i -> i));
List<Integer> result = list.stream().collect(c.collector().collector(i -> i));
assertThat(result).containsAnyElementsOf(list);
}));
}
Expand All @@ -102,7 +102,7 @@ Stream<DynamicTest> shouldStartProcessingImmediately() {
Thread.startVirtualThread(() -> {
Stream.iterate(0, i -> i + 1)
.limit(100)
.collect(c.collector().apply(i -> returnWithDelay(counter.incrementAndGet(), ofSeconds(1))));
.collect(c.collector().collector(i -> returnWithDelay(counter.incrementAndGet(), ofSeconds(1))));
});

await()
Expand All @@ -121,7 +121,7 @@ Stream<DynamicTest> shouldInterruptOnException() {
var latch = new CountDownLatch(size);

assertThatThrownBy(() -> IntStream.range(0, size).boxed()
.collect(c.collector().apply(i -> {
.collect(c.collector().collector(i -> {
try {
latch.countDown();
latch.await();
Expand All @@ -140,8 +140,8 @@ Stream<DynamicTest> shouldInterruptOnException() {
}));
}

record CollectorDefinition<T, R>(String name, Function<Function<T, R>, Collector<T, ?, List<R>>> collector) {
static <T, R> CollectorDefinition<T, R> collector(String name, Function<Function<T, R>, Collector<T, ?, List<R>>> collector) {
record CollectorDefinition<T, R>(String name, Factory.CollectorFactory<T, R> collector) {
static <T, R> CollectorDefinition<T, R> collector(String name, Factory.CollectorFactory<T, R> collector) {
return new CollectorDefinition<>(name, collector);
}
}
Expand Down
9 changes: 2 additions & 7 deletions src/test/java/com/pivovarit/collectors/test/BatchingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,8 @@ Stream<DynamicTest> shouldProcessOnExactlyNThreads() {
}));
}

@FunctionalInterface
private interface CollectorFactory<T, R> {
Collector<T, ?, List<R>> collector(Function<T, R> f, Integer p);
}

record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> collector) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> collector) {
record CollectorDefinition<T, R>(String name, Factory.CollectorFactoryWithParallelism<T, R> collector) {
static <T, R> CollectorDefinition<T, R> collector(String name, Factory.CollectorFactoryWithParallelism<T, R> collector) {
return new CollectorDefinition<>(name, collector);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static com.pivovarit.collectors.Collectors.boundedCollectors;
import static com.pivovarit.collectors.test.Factory.boundedCollectors;

class ExecutorPollutionTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Stream;

import static com.pivovarit.collectors.test.ExecutorValidationTest.CollectorDefinition.collector;
Expand Down Expand Up @@ -49,14 +45,11 @@ Stream<DynamicTest> shouldRejectInvalidRejectedExecutionHandlerFactory() {
})));
}

protected record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> factory) {
protected record CollectorDefinition<T, R>(String name, Factory.CollectorFactoryWithExecutor<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, Factory.CollectorFactoryWithExecutor<T, R> factory) {
return new CollectorDefinition<>(name, factory);
}
}

@FunctionalInterface
private interface CollectorFactory<T, R> {
Collector<T, ?, List<R>> collector(Function<T, R> f, Executor executor);
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.pivovarit.collectors;
package com.pivovarit.collectors.test;

import com.pivovarit.collectors.ParallelCollectors;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collector;
Expand All @@ -9,13 +13,13 @@
import static com.pivovarit.collectors.ParallelCollectors.Batching.parallel;
import static java.util.stream.Collectors.toList;

public final class Collectors {
public final class Factory {

private Collectors() {
private Factory() {
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
}

public static Stream<Map.Entry<String, BoundedCollectorFactory<Integer>>> boundedCollectors() {
public static Stream<Map.Entry<String, CollectorFactoryWithParallelismAndExecutor<Integer>>> boundedCollectors() {
return Stream.of(
Map.entry("parallel()", (f, e, p) -> ParallelCollectors.parallel(f, e, p)),
Map.entry("parallel(toList())", (f, e, p) -> ParallelCollectors.parallel(f, toList(), e, p)),
Expand All @@ -27,7 +31,33 @@ public static Stream<Map.Entry<String, BoundedCollectorFactory<Integer>>> bounde
Map.entry("parallelToOrderedStream() (batching)", (f, e, p) -> ParallelCollectors.Batching.parallelToOrderedStream(f, e, p)));
}

public interface BoundedCollectorFactory<T> {
@FunctionalInterface
interface CollectorFactoryWithParallelismAndExecutor<T> {
Collector<T, ?, ?> apply(Function<T, ?> function, Executor executorService, int parallelism);
}

@FunctionalInterface
interface CollectorFactoryWithExecutor<T, R> {
Collector<T, ?, List<R>> collector(Function<T, R> f, Executor executor);
}

@FunctionalInterface
interface CollectorFactoryWithParallelism<T, R> {
Collector<T, ?, List<R>> collector(Function<T, R> f, Integer p);
}

@FunctionalInterface
interface CollectorFactory<T, R> {
Collector<T, ?, List<R>> collector(Function<T, R> f);
}

@FunctionalInterface
interface StreamingCollectorFactory<T, R> {
Collector<T, ?, Stream<R>> collector(Function<T, R> f);
}

@FunctionalInterface
interface AsyncCollectorFactory<T, R> {
Collector<T, ?, CompletableFuture<List<R>>> collector(Function<T, R> f);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,12 @@ Stream<DynamicTest> shouldNotBlockTheCallingThread() {
}));
}

protected record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> collector) {
protected record CollectorDefinition<T, R>(String name, Factory.AsyncCollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, Factory.AsyncCollectorFactory<T, R> collector) {
return new CollectorDefinition<>(name, collector);
}
}

@FunctionalInterface
private interface CollectorFactory<T, R> {
Collector<T, ?, CompletableFuture<List<R>>> collector(Function<T, R> f);
}

private static Executor e() {
return Executors.newCachedThreadPool();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@ class RejectedExecutionHandlingTest {

private static Stream<CollectorDefinition<Integer, Integer>> allWithCustomExecutors() {
return Stream.of(
collector("parallel(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e), c -> c.thenApply(Stream::toList)
.join())),
collector("parallel(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 4), c -> c.thenApply(Stream::toList)
.join())),
collector("parallel(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 4), c -> c.thenApply(Stream::toList)
.join())),
collector("parallel(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e), c -> c.thenApply(Stream::toList).join())),
collector("parallel(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 4), c -> c.thenApply(Stream::toList).join())),
collector("parallel(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 4), c -> c.thenApply(Stream::toList).join())),
collector("parallelToStream(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e), Stream::toList)),
collector("parallelToStream(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e, 4), Stream::toList)),
collector("parallelToStream(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e, 4), Stream::toList)),
Expand Down Expand Up @@ -78,14 +75,9 @@ Stream<DynamicTest> shouldRejectInvalidRejectedExecutionHandlerWhenParallelismOn
}));
}

protected record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> factory) {
protected record CollectorDefinition<T, R>(String name, Factory.CollectorFactoryWithExecutor<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, Factory.CollectorFactoryWithExecutor<T, R> factory) {
return new CollectorDefinition<>(name, factory);
}
}

@FunctionalInterface
private interface CollectorFactory<T, R> {
Collector<T, ?, List<R>> collector(Function<T, R> f, Executor executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,12 @@ Stream<DynamicTest> shouldCollectInOriginalOrder() {
}));
}

protected record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> collector) {
protected record CollectorDefinition<T, R>(String name, Factory.StreamingCollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, Factory.StreamingCollectorFactory<T, R> collector) {
return new CollectorDefinition<>(name, collector);
}
}

@FunctionalInterface
private interface CollectorFactory<T, R> {
Collector<T, ?, Stream<R>> collector(Function<T, R> f);
}

private static Executor e() {
return Executors.newCachedThreadPool();
}
Expand Down

0 comments on commit 9fbf2cd

Please sign in to comment.