diff --git a/implementation/pom.xml b/implementation/pom.xml
index 854abf913..210473424 100644
--- a/implementation/pom.xml
+++ b/implementation/pom.xml
@@ -24,7 +24,7 @@
${smallrye-common-annotation.version}
- org.jctools
+ io.github.jponge.jct
jctools-core
diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java
index 948958916..792bee2ee 100644
--- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java
+++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java
@@ -3,11 +3,6 @@
import java.util.Queue;
import java.util.function.Supplier;
-import org.jctools.queues.atomic.MpscAtomicArrayQueue;
-import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue;
-import org.jctools.queues.atomic.SpscAtomicArrayQueue;
-import org.jctools.queues.atomic.SpscChunkedAtomicArrayQueue;
-import org.jctools.queues.atomic.SpscUnboundedAtomicArrayQueue;
import org.jctools.queues.unpadded.MpscUnboundedUnpaddedArrayQueue;
import org.jctools.queues.unpadded.MpscUnpaddedArrayQueue;
import org.jctools.queues.unpadded.SpscChunkedUnpaddedArrayQueue;
@@ -23,27 +18,15 @@ private Queues() {
}
public static Queue createSpscArrayQueue(int capacity) {
- if (Infrastructure.useUnsafeForQueues()) {
- return new SpscUnpaddedArrayQueue<>(capacity);
- } else {
- return new SpscAtomicArrayQueue<>(capacity);
- }
+ return new SpscUnpaddedArrayQueue<>(capacity);
}
public static Queue createSpscUnboundedArrayQueue(int chunkSize) {
- if (Infrastructure.useUnsafeForQueues()) {
- return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize);
- } else {
- return new SpscUnboundedAtomicArrayQueue<>(chunkSize);
- }
+ return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize);
}
public static Queue createSpscChunkedArrayQueue(int capacity) {
- if (Infrastructure.useUnsafeForQueues()) {
- return new SpscChunkedUnpaddedArrayQueue<>(capacity);
- } else {
- return new SpscChunkedAtomicArrayQueue<>(capacity);
- }
+ return new SpscChunkedUnpaddedArrayQueue<>(capacity);
}
public static Supplier> getXsQueueSupplier() {
@@ -104,26 +87,18 @@ public static Supplier> unbounded(int chunkSize) {
* @return the queue
*/
public static Queue createMpscQueue() {
- if (Infrastructure.useUnsafeForQueues()) {
- return new MpscUnboundedUnpaddedArrayQueue<>(Infrastructure.getBufferSizeS());
- } else {
- return new MpscUnboundedAtomicArrayQueue<>(Infrastructure.getBufferSizeS());
- }
+ return new MpscUnboundedUnpaddedArrayQueue<>(Infrastructure.getBufferSizeS());
}
/**
* Creates an unbounded single producer / single consumer queue.
*
* @param chunkSize the chunk size
- * @return the queue
* @param the item type
+ * @return the queue
*/
public static Queue createSpscUnboundedQueue(int chunkSize) {
- if (Infrastructure.useUnsafeForQueues()) {
- return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize);
- } else {
- return new SpscUnboundedAtomicArrayQueue<>(chunkSize);
- }
+ return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize);
}
/**
@@ -134,10 +109,6 @@ public static Queue createSpscUnboundedQueue(int chunkSize) {
* @return a new queue
*/
public static Queue createMpscArrayQueue(int capacity) {
- if (Infrastructure.useUnsafeForQueues()) {
- return new MpscUnpaddedArrayQueue<>(capacity);
- } else {
- return new MpscAtomicArrayQueue<>(capacity);
- }
+ return new MpscUnpaddedArrayQueue<>(capacity);
}
}
diff --git a/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java b/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java
index fc73016bf..fc3ec5a13 100644
--- a/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java
+++ b/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java
@@ -69,8 +69,6 @@ public class Infrastructure {
private static int bufferSizeXs = 32;
private static int bufferSizeS = 256;
- private static boolean useUnsafeForQueues = true;
-
public static void reload() {
clearInterceptors();
reloadUniInterceptors();
@@ -79,27 +77,6 @@ public static void reload() {
multiOverflowDefaultBufferSize = 128;
bufferSizeXs = 32;
bufferSizeS = 256;
- useUnsafeForQueues = true;
- }
-
- /**
- * Should JCTools queues use variants with {@code Unsafe}, or should they use atomic field updaters?
- * Atomic field updates work across JVM and native images, while padded JCTools queues are better suited
- * for JVM mode applications.
- *
- * @return {@code true} when {@code Unsafe} should be reasonably available, {@code false} otherwise
- */
- public static boolean useUnsafeForQueues() {
- return useUnsafeForQueues;
- }
-
- /**
- * Change how JCTools queues should be created ({@code Unsafe} vs atomic field updaters).
- *
- * @param useUnsafeForQueues {@code true} when {@code Unsafe} should be reasonably available, {@code false} otherwise
- */
- public static void setUseUnsafeForQueues(boolean useUnsafeForQueues) {
- Infrastructure.useUnsafeForQueues = useUnsafeForQueues;
}
/**
diff --git a/implementation/src/main/java/module-info.java b/implementation/src/main/java/module-info.java
index 13191b46c..15f97369d 100644
--- a/implementation/src/main/java/module-info.java
+++ b/implementation/src/main/java/module-info.java
@@ -1,7 +1,7 @@
open module io.smallrye.mutiny {
requires transitive io.smallrye.common.annotation;
- requires jctools.core;
+ requires org.jctools.core;
exports io.smallrye.mutiny;
exports io.smallrye.mutiny.converters.multi;
diff --git a/native-tests/pom.xml b/native-tests/pom.xml
deleted file mode 100644
index 3c8c0cf9b..000000000
--- a/native-tests/pom.xml
+++ /dev/null
@@ -1,77 +0,0 @@
-
-
- 4.0.0
-
- io.smallrye.reactive
- mutiny-project
- 999-SNAPSHOT
-
-
- SmallRye Mutiny - Native tests
- Native tests
- native-tests
-
-
-
- io.smallrye.reactive
- mutiny
-
-
- io.smallrye.reactive
- mutiny-test-utils
- test
-
-
- org.junit.jupiter
- junit-jupiter
- test
-
-
- org.junit.jupiter
- junit-jupiter-engine
- test
-
-
-
-
-
- native
-
-
- org.graalvm.buildtools
- junit-platform-native
- ${junit-platform-native.version}
- test
-
-
-
-
-
- org.graalvm.buildtools
- native-maven-plugin
- ${native-maven-plugin.version}
- true
-
-
- test-native
-
- test
-
- test
-
-
-
-
- --no-fallback
- --verbose
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTests.java b/native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTests.java
deleted file mode 100644
index 664c2fafc..000000000
--- a/native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTests.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package io.smallrye.mutiny.nativetests;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.time.Duration;
-import java.util.Random;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.EnabledInNativeImage;
-
-import io.smallrye.mutiny.Multi;
-import io.smallrye.mutiny.helpers.test.AssertSubscriber;
-import io.smallrye.mutiny.infrastructure.Infrastructure;
-
-public class SmokeTests {
-
- @Test
- public void concatMap() {
- AssertSubscriber subscriber = AssertSubscriber.create();
- Multi.createFrom().range(1, 10_000)
- .onItem().transformToMultiAndConcatenate(n -> Multi.createFrom().range(n + 2, n + 4))
- .subscribe().withSubscriber(subscriber);
-
- subscriber.request(5);
- subscriber.assertItems(3, 4, 4, 5, 5);
-
- subscriber.request(Long.MAX_VALUE);
- subscriber.assertCompleted();
- assertEquals(19998, subscriber.getItems().size());
- }
-
- @Test
- @EnabledInNativeImage
- public void emitterFailingInNative() {
- assertThrows(ExceptionInInitializerError.class, this::emitterScenario);
- }
-
- @Test
- public void emitterWorkingInNative() {
- runWithAtomicQueues(this::emitterScenario);
- }
-
- private void runWithAtomicQueues(Runnable action) {
- Infrastructure.setUseUnsafeForQueues(false);
- try {
- action.run();
- } finally {
- Infrastructure.setUseUnsafeForQueues(true);
- }
- }
-
- private void emitterScenario() {
- AssertSubscriber subscriber = AssertSubscriber.create();
- Multi.createFrom(). emitter(emitter -> {
- new Thread(() -> {
- Random random = new Random();
- for (int i = 0; i < 10_000; i++) {
- emitter.emit(random.nextInt());
- }
- emitter.complete();
- }).start();
- }).subscribe().withSubscriber(subscriber);
-
- subscriber.request(Long.MAX_VALUE);
- subscriber.awaitCompletion();
- assertEquals(10_000, subscriber.getItems().size());
- }
-
- @Test
- @EnabledInNativeImage
- public void overflowFailingInNative() {
- assertThrows(NoClassDefFoundError.class, this::overflowScenario);
- }
-
- @Test
- public void overflowWorkingInNative() {
- runWithAtomicQueues(this::overflowScenario);
- }
-
- private void overflowScenario() {
- AssertSubscriber subscriber = Multi.createFrom().ticks().every(Duration.ofMillis(10))
- .onOverflow().bufferUnconditionally()
- .subscribe().withSubscriber(AssertSubscriber.create(5));
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- subscriber.cancel();
- }
-}
diff --git a/pom.xml b/pom.xml
index 5938aac7d..aa924f0db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,11 +74,10 @@
bom
math
workshop-examples
- native-tests
- 4.0.1
+ 4.0.2-RC2
1.0.0
1.0.4
3.1.8
@@ -125,8 +124,6 @@
1.1.0
1.9.0
2.7.10
- 0.9.27
- 0.9.7.1
@@ -137,7 +134,7 @@
${project.version}
- org.jctools
+ io.github.jponge.jct
jctools-core
${jctools-core.version}