Skip to content

Commit

Permalink
Infrastructure support to switch between unpadded and atomic JCTools …
Browse files Browse the repository at this point in the history
…variants
  • Loading branch information
jponge committed Oct 18, 2023
1 parent 36cca74 commit 924bb7c
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import java.util.Queue;
import java.util.function.Supplier;

import org.jctools.queues.MpscArrayQueue;
import org.jctools.queues.MpscLinkedQueue;
import org.jctools.queues.SpscArrayQueue;
import org.jctools.queues.SpscUnboundedArrayQueue;
import org.jctools.queues.atomic.MpscAtomicArrayQueue;
import org.jctools.queues.atomic.MpscLinkedAtomicQueue;
import org.jctools.queues.atomic.SpscAtomicArrayQueue;
import org.jctools.queues.atomic.SpscUnboundedAtomicArrayQueue;
import org.jctools.queues.unpadded.MpscLinkedUnpaddedQueue;
import org.jctools.queues.unpadded.MpscUnpaddedArrayQueue;
import org.jctools.queues.unpadded.SpscUnboundedUnpaddedArrayQueue;
import org.jctools.queues.unpadded.SpscUnpaddedArrayQueue;

import io.smallrye.mutiny.infrastructure.Infrastructure;

Expand All @@ -22,17 +26,24 @@ private Queues() {
// avoid direct instantiation
}

static final Supplier EMPTY_QUEUE_SUPPLIER = EmptyQueue::new;
static final Supplier SINGLETON_QUEUE_SUPPLIER = SingletonQueue::new;

static final Supplier XS_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(Infrastructure.getBufferSizeXs());
static final Supplier S_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(Infrastructure.getBufferSizeS());
public static <T> Queue<T> createSpscArrayQueue(int size) {
if (Infrastructure.useUnsafeForQueues()) {
return new SpscUnpaddedArrayQueue<>(size);
} else {
return new SpscAtomicArrayQueue<>(size);
}
}

static final Supplier UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(Infrastructure.getBufferSizeS());
static final Supplier XS_UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(Infrastructure.getBufferSizeXs());
public static <T> Queue<T> createSpscUnboundedArrayQueue(int size) {
if (Infrastructure.useUnsafeForQueues()) {
return new SpscUnboundedUnpaddedArrayQueue<>(size);
} else {
return new SpscUnboundedAtomicArrayQueue<>(size);
}
}

public static <T> Supplier<Queue<T>> getXsQueueSupplier() {
return (Supplier<Queue<T>>) XS_QUEUE_SUPPLIER;
return () -> createSpscArrayQueue(Infrastructure.getBufferSizeXs());
}

/**
Expand All @@ -46,26 +57,26 @@ public static <T> Supplier<Queue<T>> getXsQueueSupplier() {
*/
public static <T> Supplier<Queue<T>> get(int bufferSize) {
if (bufferSize == Infrastructure.getBufferSizeXs()) {
return XS_QUEUE_SUPPLIER;
return () -> createSpscArrayQueue(Infrastructure.getBufferSizeXs());
}

if (bufferSize == Infrastructure.getBufferSizeS()) {
return S_QUEUE_SUPPLIER;
return () -> createSpscArrayQueue(Infrastructure.getBufferSizeS());
}

if (bufferSize == 1) {
return SINGLETON_QUEUE_SUPPLIER;
return SingletonQueue::new;
}

if (bufferSize == 0) {
return EMPTY_QUEUE_SUPPLIER;
return EmptyQueue::new;
}

final int computedSize = Math.max(8, bufferSize);
if (computedSize > TOO_LARGE_TO_BE_BOUNDED) {
return UNBOUNDED_QUEUE_SUPPLIER;
return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeS());
} else {
return () -> new SpscArrayQueue<>(computedSize);
return () -> createSpscArrayQueue(computedSize);
}
}

Expand All @@ -80,11 +91,11 @@ public static <T> Supplier<Queue<T>> get(int bufferSize) {
@SuppressWarnings("unchecked")
public static <T> Supplier<Queue<T>> unbounded(int size) {
if (size == Infrastructure.getBufferSizeXs()) {
return XS_UNBOUNDED_QUEUE_SUPPLIER;
return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeXs());
} else if (size == Integer.MAX_VALUE || size == Infrastructure.getBufferSizeS()) {
return UNBOUNDED_QUEUE_SUPPLIER;
return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeS());
} else {
return () -> new SpscUnboundedArrayQueue<>(size);
return () -> createSpscUnboundedArrayQueue(size);
}
}

Expand All @@ -95,7 +106,11 @@ public static <T> Supplier<Queue<T>> unbounded(int size) {
* @return the queue
*/
public static <T> Queue<T> createMpscQueue() {
return new MpscLinkedQueue<>();
if (Infrastructure.useUnsafeForQueues()) {
return new MpscLinkedUnpaddedQueue<>();
} else {
return new MpscLinkedAtomicQueue<>();
}
}

/**
Expand All @@ -106,7 +121,11 @@ public static <T> Queue<T> createMpscQueue() {
* @param <T> the item type
*/
public static <T> Queue<T> createSpscUnboundedQueue(int size) {
return new SpscUnboundedArrayQueue<>(size);
if (Infrastructure.useUnsafeForQueues()) {
return new SpscUnboundedUnpaddedArrayQueue<>(size);
} else {
return new SpscUnboundedAtomicArrayQueue<>(size);
}
}

/**
Expand All @@ -117,7 +136,11 @@ public static <T> Queue<T> createSpscUnboundedQueue(int size) {
* @return a new queue
*/
public static <T> Queue<T> createMpscArrayQueue(int size) {
return new MpscArrayQueue<>(size);
if (Infrastructure.useUnsafeForQueues()) {
return new MpscUnpaddedArrayQueue<>(size);
} else {
return new MpscAtomicArrayQueue<>(size);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,12 @@
package io.smallrye.mutiny.infrastructure;

import static io.smallrye.mutiny.helpers.ParameterValidation.*;
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
import static io.smallrye.mutiny.helpers.ParameterValidation.positive;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.function.*;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -69,6 +50,8 @@ public class Infrastructure {
private static int bufferSizeXs = 32;
private static int bufferSizeS = 256;

private static boolean useUnsafeForQueues = true;

public static void reload() {
clearInterceptors();
reloadUniInterceptors();
Expand All @@ -77,6 +60,27 @@ public static void reload() {
multiOverflowDefaultBufferSize = 128;
bufferSizeXs = 32;
bufferSizeS = 256;
useUnsafeForQueues = true;
}

/**
* Should JCTools queues use variants with {@code Unsafe}, or should they use atomic field updaters?
* Atomic field updates work across JVM and native images, while padded JCTools queues are better suited
* for JVM mode applications.
*
* @return {@code true} when {@code Unsafe} should be reasonably available, {@code false} otherwise
*/
public static boolean useUnsafeForQueues() {
return useUnsafeForQueues;
}

/**
* Change how JCTools queues should be created ({@code Unsafe} vs atomic field updaters).
*
* @param useUnsafeForQueues {@code true} when {@code Unsafe} should be reasonably available, {@code false} otherwise
*/
public static void setUseUnsafeForQueues(boolean useUnsafeForQueues) {
Infrastructure.useUnsafeForQueues = useUnsafeForQueues;
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.smallrye.mutiny.nativetests;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.time.Duration;
import java.util.Random;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledInNativeImage;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.smallrye.mutiny.infrastructure.Infrastructure;

public class SmokeTests {

@Test
public void concatMap() {
AssertSubscriber<Integer> subscriber = AssertSubscriber.create();
Multi.createFrom().range(1, 10_000)
.onItem().transformToMultiAndConcatenate(n -> Multi.createFrom().range(n + 2, n + 4))
.subscribe().withSubscriber(subscriber);

subscriber.request(5);
subscriber.assertItems(3, 4, 4, 5, 5);

subscriber.request(Long.MAX_VALUE);
subscriber.assertCompleted();
assertEquals(19998, subscriber.getItems().size());
}

@Test
@EnabledInNativeImage
public void emitterFailingInNative() {
assertThrows(ExceptionInInitializerError.class, this::emitterScenario);
}

@Test
public void emitterWorkingInNative() {
runWithAtomicQueues(this::emitterScenario);
}

private void runWithAtomicQueues(Runnable action) {
Infrastructure.setUseUnsafeForQueues(false);
try {
action.run();
} finally {
Infrastructure.setUseUnsafeForQueues(true);
}
}

private void emitterScenario() {
AssertSubscriber<Integer> subscriber = AssertSubscriber.create();
Multi.createFrom().<Integer> emitter(emitter -> {
new Thread(() -> {
Random random = new Random();
for (int i = 0; i < 10_000; i++) {
emitter.emit(random.nextInt());
}
emitter.complete();
}).start();
}).subscribe().withSubscriber(subscriber);

subscriber.request(Long.MAX_VALUE);
subscriber.awaitCompletion();
assertEquals(10_000, subscriber.getItems().size());
}

@Test
@EnabledInNativeImage
public void overflowFailingInNative() {
assertThrows(NoClassDefFoundError.class, this::overflowScenario);
}

@Test
public void overflowWorkingInNative() {
runWithAtomicQueues(this::overflowScenario);
}

private void overflowScenario() {
AssertSubscriber<Long> subscriber = Multi.createFrom().ticks().every(Duration.ofMillis(10))
.onOverflow().bufferUnconditionally()
.subscribe().withSubscriber(AssertSubscriber.create(5));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
subscriber.cancel();
}
}

0 comments on commit 924bb7c

Please sign in to comment.