diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java index e7944ddca..023bad2ba 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java @@ -4,11 +4,13 @@ import java.util.function.Supplier; import org.jctools.queues.atomic.MpscAtomicArrayQueue; -import org.jctools.queues.atomic.MpscLinkedAtomicQueue; +import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue; import org.jctools.queues.atomic.SpscAtomicArrayQueue; +import org.jctools.queues.atomic.SpscChunkedAtomicArrayQueue; import org.jctools.queues.atomic.SpscUnboundedAtomicArrayQueue; -import org.jctools.queues.unpadded.MpscLinkedUnpaddedQueue; +import org.jctools.queues.unpadded.MpscUnboundedUnpaddedArrayQueue; import org.jctools.queues.unpadded.MpscUnpaddedArrayQueue; +import org.jctools.queues.unpadded.SpscChunkedUnpaddedArrayQueue; import org.jctools.queues.unpadded.SpscUnboundedUnpaddedArrayQueue; import org.jctools.queues.unpadded.SpscUnpaddedArrayQueue; @@ -17,11 +19,6 @@ @SuppressWarnings({ "rawtypes", "unchecked" }) public class Queues { - /** - * Queues with a requested with a capacity greater than this value are unbounded. - */ - public static final int TOO_LARGE_TO_BE_BOUNDED = 10_000_000; - private Queues() { // avoid direct instantiation } @@ -42,6 +39,14 @@ public static Queue createSpscUnboundedArrayQueue(int size) { } } + public static Queue createSpscChunkedArrayQueue(int size) { + if (Infrastructure.useUnsafeForQueues()) { + return new SpscChunkedUnpaddedArrayQueue<>(size); + } else { + return new SpscChunkedAtomicArrayQueue<>(size); + } + } + public static Supplier> getXsQueueSupplier() { return () -> createSpscArrayQueue(Infrastructure.getBufferSizeXs()); } @@ -72,12 +77,7 @@ public static Supplier> get(int bufferSize) { return EmptyQueue::new; } - final int computedSize = Math.max(8, bufferSize); - if (computedSize > TOO_LARGE_TO_BE_BOUNDED) { - return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeS()); - } else { - return () -> createSpscArrayQueue(computedSize); - } + return () -> createSpscChunkedArrayQueue(bufferSize); } /** @@ -107,9 +107,9 @@ public static Supplier> unbounded(int size) { */ public static Queue createMpscQueue() { if (Infrastructure.useUnsafeForQueues()) { - return new MpscLinkedUnpaddedQueue<>(); + return new MpscUnboundedUnpaddedArrayQueue<>(Infrastructure.getBufferSizeS()); } else { - return new MpscLinkedAtomicQueue<>(); + return new MpscUnboundedAtomicArrayQueue<>(Infrastructure.getBufferSizeS()); } }