diff --git a/implementation/pom.xml b/implementation/pom.xml
index 4fd45f81e..210473424 100644
--- a/implementation/pom.xml
+++ b/implementation/pom.xml
@@ -23,6 +23,10 @@
smallrye-common-annotation${smallrye-common-annotation.version}
+
+ io.github.jponge.jct
+ jctools-core
+ io.reactivex.rxjava3
diff --git a/implementation/revapi.json b/implementation/revapi.json
index 2c593c3f8..22eef910e 100644
--- a/implementation/revapi.json
+++ b/implementation/revapi.json
@@ -57,6 +57,48 @@
"code": "java.class.removed",
"old": "class io.smallrye.mutiny.operators.multi.MultiConcatMapOp.ConcatMapMainSubscriber",
"justification": "Internal API refactoring"
+ },
+ {
+ "ignore": true,
+ "code": "java.method.removed",
+ "old": "method java.util.Queue io.smallrye.mutiny.helpers.queues.Queues::createStrictSizeQueue(int)",
+ "justification": "Refactoring of internal APIs"
+ },
+ {
+ "ignore": true,
+ "code": "java.field.removedWithConstant",
+ "old": "field io.smallrye.mutiny.helpers.queues.Queues.TO_LARGE_TO_BE_BOUNDED",
+ "justification": "Typo (internal API)"
+ },
+ {
+ "ignore": true,
+ "code": "java.class.removed",
+ "old": "class io.smallrye.mutiny.helpers.queues.MpscLinkedQueue",
+ "justification": "Refactoring of internal APIs"
+ },
+ {
+ "ignore": true,
+ "code": "java.class.removed",
+ "old": "class io.smallrye.mutiny.helpers.queues.SpscArrayQueue",
+ "justification": "Refactoring of internal APIs"
+ },
+ {
+ "ignore": true,
+ "code": "java.class.removed",
+ "old": "class io.smallrye.mutiny.helpers.queues.SpscLinkedArrayQueue",
+ "justification": "Refactoring of internal APIs"
+ },
+ {
+ "ignore": true,
+ "code": "java.field.removed",
+ "old": "field io.smallrye.mutiny.helpers.queues.Queues.BUFFER_S",
+ "justification": "Refactoring of internal APIs"
+ },
+ {
+ "ignore": true,
+ "code": "java.field.removed",
+ "old": "field io.smallrye.mutiny.helpers.queues.Queues.BUFFER_XS",
+ "justification": "Refactoring of internal APIs"
}
]
}
diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiFlatten.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiFlatten.java
index 627a255ff..f97a86529 100644
--- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiFlatten.java
+++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiFlatten.java
@@ -8,7 +8,6 @@
import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Multi;
-import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.multi.MultiConcatMapOp;
import io.smallrye.mutiny.operators.multi.MultiFlatMapOp;
@@ -77,7 +76,7 @@ public MultiFlatten withRequests(int requests) {
*/
@CheckReturnValue
public Multi merge() {
- return merge(Queues.BUFFER_S);
+ return merge(Infrastructure.getBufferSizeS());
}
/**
diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/MpscLinkedQueue.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/MpscLinkedQueue.java
deleted file mode 100644
index 936649902..000000000
--- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/MpscLinkedQueue.java
+++ /dev/null
@@ -1,242 +0,0 @@
-package io.smallrye.mutiny.helpers.queues;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A multi-producer single consumer unbounded queue.
- * Code from RX Java 2.
- *
- * @param the contained value type
- */
-public final class MpscLinkedQueue implements Queue {
- private final AtomicReference> producerNode;
- private final AtomicReference> consumerNode;
-
- public MpscLinkedQueue() {
- producerNode = new AtomicReference<>();
- consumerNode = new AtomicReference<>();
- LinkedQueueNode node = new LinkedQueueNode<>();
- spConsumerNode(node);
- xchgProducerNode(node); // this ensures correct construction: StoreLoad
- }
-
- @Override
- public boolean add(T t) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean remove(Object o) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean containsAll(Collection> c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean addAll(Collection extends T> c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean removeAll(Collection> c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean retainAll(Collection> c) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * IMPLEMENTATION NOTES:
- * Offer is allowed from multiple threads.
- * Offer allocates a new node and:
- *
- *
Swaps it atomically with current producer node (only one producer 'wins')
- *
Sets the new node as the node following from the swapped producer node
- *
- * This works because each producer is guaranteed to 'plant' a new node and link the old node. No 2 producers can
- * get the same producer node as part of XCHG guarantee.
- *
- * @see java.util.Queue#offer(Object)
- */
- @Override
- public boolean offer(final T e) {
- if (null == e) {
- throw new NullPointerException("Null is not a valid element");
- }
- final LinkedQueueNode nextNode = new LinkedQueueNode<>(e);
- final LinkedQueueNode prevProducerNode = xchgProducerNode(nextNode);
- // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed
- // and completes the store in prev.next.
- prevProducerNode.soNext(nextNode); // StoreStore
- return true;
- }
-
- @Override
- public T remove() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * IMPLEMENTATION NOTES:
- * Poll is allowed from a SINGLE thread.
- * Poll reads the next node from the consumerNode and:
- *
- *
If it is null, the queue is assumed empty (though it might not be).
- *
If it is not null set it as the consumer node and return it's now evacuated value.
- *
- * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null
- * values are not allowed to be offered this is the only node with it's value set to null at any one time.
- *
- * @see java.util.Queue#poll()
- */
- @Override
- public T poll() {
- LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright
- LinkedQueueNode nextNode = currConsumerNode.lvNext();
- if (nextNode != null) {
- // we have to null out the value because we are going to hang on to the node
- final T nextValue = nextNode.getAndNullValue();
- spConsumerNode(nextNode);
- return nextValue;
- } else if (currConsumerNode != lvProducerNode()) {
- // spin, we are no longer wait free
- //noinspection StatementWithEmptyBody
- while ((nextNode = currConsumerNode.lvNext()) == null) {
- } // got the next node...
-
- // we have to null out the value because we are going to hang on to the node
- final T nextValue = nextNode.getAndNullValue();
- spConsumerNode(nextNode);
- return nextValue;
- }
- return null;
- }
-
- @Override
- public T element() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public T peek() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void clear() {
- //noinspection StatementWithEmptyBody
- while (poll() != null && !isEmpty()) {
- }
- }
-
- LinkedQueueNode lvProducerNode() {
- return producerNode.get();
- }
-
- LinkedQueueNode xchgProducerNode(LinkedQueueNode node) {
- return producerNode.getAndSet(node);
- }
-
- LinkedQueueNode lvConsumerNode() {
- return consumerNode.get();
- }
-
- LinkedQueueNode lpConsumerNode() {
- return consumerNode.get();
- }
-
- void spConsumerNode(LinkedQueueNode node) {
- consumerNode.lazySet(node);
- }
-
- @Override
- public int size() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * IMPLEMENTATION NOTES:
- * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe
- * the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to
- * be null.
- */
- @Override
- public boolean isEmpty() {
- return lvConsumerNode() == lvProducerNode();
- }
-
- @Override
- public boolean contains(Object o) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator iterator() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object[] toArray() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public T1[] toArray(T1[] a) {
- throw new UnsupportedOperationException();
- }
-
- static final class LinkedQueueNode extends AtomicReference> {
-
- private static final long serialVersionUID = 2404266111789071508L;
-
- private E value;
-
- LinkedQueueNode() {
- }
-
- LinkedQueueNode(E val) {
- spValue(val);
- }
-
- /**
- * Gets the current value and nulls out the reference to it from this node.
- *
- * @return value
- */
- public E getAndNullValue() {
- E temp = lpValue();
- spValue(null);
- return temp;
- }
-
- public E lpValue() {
- return value;
- }
-
- public void spValue(E newValue) {
- value = newValue;
- }
-
- public void soNext(LinkedQueueNode n) {
- lazySet(n);
- }
-
- public LinkedQueueNode lvNext() {
- return get();
- }
- }
-}
diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java
index 1d5c121ce..792bee2ee 100644
--- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java
+++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java
@@ -1,38 +1,36 @@
package io.smallrye.mutiny.helpers.queues;
import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Supplier;
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class Queues {
+import org.jctools.queues.unpadded.MpscUnboundedUnpaddedArrayQueue;
+import org.jctools.queues.unpadded.MpscUnpaddedArrayQueue;
+import org.jctools.queues.unpadded.SpscChunkedUnpaddedArrayQueue;
+import org.jctools.queues.unpadded.SpscUnboundedUnpaddedArrayQueue;
+import org.jctools.queues.unpadded.SpscUnpaddedArrayQueue;
- /**
- * Queues with a requested with a capacity greater than this value are unbounded.
- */
- public static final int TO_LARGE_TO_BE_BOUNDED = 10_000_000;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+
+public class Queues {
private Queues() {
// avoid direct instantiation
}
- public static final int BUFFER_XS = Math.max(8,
- Integer.parseInt(System.getProperty("mutiny.buffer-size.xs", "32")));
-
- public static final int BUFFER_S = Math.max(16,
- Integer.parseInt(System.getProperty("mutiny.buffer-size.s", "256")));
-
- static final Supplier EMPTY_QUEUE_SUPPLIER = EmptyQueue::new;
- static final Supplier SINGLETON_QUEUE_SUPPLIER = SingletonQueue::new;
+ public static Queue createSpscArrayQueue(int capacity) {
+ return new SpscUnpaddedArrayQueue<>(capacity);
+ }
- static final Supplier XS_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(BUFFER_XS);
- static final Supplier S_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(BUFFER_S);
+ public static Queue createSpscUnboundedArrayQueue(int chunkSize) {
+ return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize);
+ }
- static final Supplier UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscLinkedArrayQueue<>(BUFFER_S);
- static final Supplier XS_UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscLinkedArrayQueue<>(BUFFER_XS);
+ public static Queue createSpscChunkedArrayQueue(int capacity) {
+ return new SpscChunkedUnpaddedArrayQueue<>(capacity);
+ }
public static Supplier> getXsQueueSupplier() {
- return (Supplier>) XS_QUEUE_SUPPLIER;
+ return () -> createSpscArrayQueue(Infrastructure.getBufferSizeXs());
}
/**
@@ -40,51 +38,45 @@ public static Supplier> getXsQueueSupplier() {
*
* The type of the queue and configuration is computed based on the given buffer size.
*
- * @param bufferSize the buffer size
+ * @param capacity the buffer size
* @param the type of element
* @return the supplier.
*/
- public static Supplier> get(int bufferSize) {
- if (bufferSize == BUFFER_XS) {
- return XS_QUEUE_SUPPLIER;
+ public static Supplier> get(int capacity) {
+ if (capacity == Infrastructure.getBufferSizeXs()) {
+ return () -> createSpscArrayQueue(Infrastructure.getBufferSizeXs());
}
- if (bufferSize == BUFFER_S) {
- return S_QUEUE_SUPPLIER;
+ if (capacity == Infrastructure.getBufferSizeS()) {
+ return () -> createSpscArrayQueue(Infrastructure.getBufferSizeS());
}
- if (bufferSize == 1) {
- return SINGLETON_QUEUE_SUPPLIER;
+ if (capacity == 1) {
+ return SingletonQueue::new;
}
- if (bufferSize == 0) {
- return EMPTY_QUEUE_SUPPLIER;
+ if (capacity == 0) {
+ return EmptyQueue::new;
}
- final int computedSize = Math.max(8, bufferSize);
- if (computedSize > TO_LARGE_TO_BE_BOUNDED) {
- return UNBOUNDED_QUEUE_SUPPLIER;
- } else {
- return () -> new SpscArrayQueue<>(computedSize);
- }
+ return () -> createSpscChunkedArrayQueue(capacity);
}
/**
* Returns an unbounded Queue.
* The queue is array-backed. Each array has the given size. If the queue is full, new arrays can be allocated.
*
- * @param size the size of the array
+ * @param chunkSize the size of the array
* @param the type of item
* @return the unbound queue supplier
*/
- @SuppressWarnings("unchecked")
- public static Supplier> unbounded(int size) {
- if (size == BUFFER_XS) {
- return XS_UNBOUNDED_QUEUE_SUPPLIER;
- } else if (size == Integer.MAX_VALUE || size == BUFFER_S) {
- return UNBOUNDED_QUEUE_SUPPLIER;
+ public static Supplier> unbounded(int chunkSize) {
+ if (chunkSize == Infrastructure.getBufferSizeXs()) {
+ return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeXs());
+ } else if (chunkSize == Integer.MAX_VALUE || chunkSize == Infrastructure.getBufferSizeS()) {
+ return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeS());
} else {
- return () -> new SpscLinkedArrayQueue<>(size);
+ return () -> createSpscUnboundedArrayQueue(chunkSize);
}
}
@@ -95,17 +87,28 @@ public static Supplier> unbounded(int size) {
* @return the queue
*/
public static Queue createMpscQueue() {
- return new MpscLinkedQueue<>();
+ return new MpscUnboundedUnpaddedArrayQueue<>(Infrastructure.getBufferSizeS());
+ }
+
+ /**
+ * Creates an unbounded single producer / single consumer queue.
+ *
+ * @param chunkSize the chunk size
+ * @param the item type
+ * @return the queue
+ */
+ public static Queue createSpscUnboundedQueue(int chunkSize) {
+ return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize);
}
/**
- * Create a queue of a strict fixed size.
+ * Create a MPSC queue with a given size
*
- * @param size the queue size
+ * @param capacity the queue size, will be rounded
* @param the elements type
* @return a new queue
*/
- public static Queue createStrictSizeQueue(int size) {
- return new ArrayBlockingQueue<>(size);
+ public static Queue createMpscArrayQueue(int capacity) {
+ return new MpscUnpaddedArrayQueue<>(capacity);
}
}
diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscArrayQueue.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscArrayQueue.java
deleted file mode 100755
index f2153ccb9..000000000
--- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscArrayQueue.java
+++ /dev/null
@@ -1,192 +0,0 @@
-package io.smallrye.mutiny.helpers.queues;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-
-/**
- * A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer.
- *
- * Code inspired from https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic,
- * and it's RX Java 2 version.
- *
- * @param the element type of the queue
- */
-public final class SpscArrayQueue extends AtomicReferenceArray implements Queue {
- private static final Integer MAX_LOOK_AHEAD_STEP = 4096;
- private final int mask;
- private final AtomicLong producerIndex = new AtomicLong();
- private long producerLookAhead;
- private final AtomicLong consumerIndex = new AtomicLong();
- private final int lookAheadStep;
-
- public SpscArrayQueue(int capacity) {
- super(roundToPowerOfTwo(capacity));
- this.mask = length() - 1;
- this.lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
- }
-
- /**
- * Find the next larger positive power of two value up from the given value. If value is a power of two then
- * this value will be returned.
- *
- * @param value from which next positive power of two will be found.
- * @return the next positive power of 2 or this value if it is a power of 2.
- */
- public static int roundToPowerOfTwo(final int value) {
- return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
- }
-
- @Override
- public boolean offer(E e) {
- if (null == e) {
- throw new NullPointerException("Null is not a valid element");
- }
- // local load of field to avoid repeated loads after volatile reads
- final int mask = this.mask;
- final long index = producerIndex.get();
- final int offset = calcElementOffset(index, mask);
- if (index >= producerLookAhead) {
- int step = lookAheadStep;
- if (null == lvElement(calcElementOffset(index + step, mask))) { // LoadLoad
- producerLookAhead = index + step;
- } else if (null != lvElement(offset)) {
- return false;
- }
- }
- soElement(offset, e); // StoreStore
- soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
- return true;
- }
-
- @Override
- public E poll() {
- final long index = consumerIndex.get();
- final int offset = calcElementOffset(index);
- // local load of field to avoid repeated loads after volatile reads
- final E e = lvElement(offset); // LoadLoad
- if (null == e) {
- return null;
- }
- soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
- soElement(offset, null); // StoreStore
- return e;
- }
-
- @Override
- public int size() {
- long ci = consumerIndex.get();
- for (;;) {
- long pi = producerIndex.get();
- long ci2 = consumerIndex.get();
- if (ci == ci2) {
- return (int) (pi - ci);
- }
- ci = ci2;
- }
- }
-
- public E peek() {
- int offset = (int) consumerIndex.get() & mask;
- return get(offset);
- }
-
- @Override
- public boolean isEmpty() {
- return producerIndex.get() == consumerIndex.get();
- }
-
- void soProducerIndex(long newIndex) {
- producerIndex.lazySet(newIndex);
- }
-
- void soConsumerIndex(long newIndex) {
- consumerIndex.lazySet(newIndex);
- }
-
- @Override
- public void clear() {
- // we have to test isEmpty because of the weaker poll() guarantee
- //noinspection StatementWithEmptyBody
- while (poll() != null || !isEmpty()) {
- }
- }
-
- int calcElementOffset(long index, int mask) {
- return (int) index & mask;
- }
-
- int calcElementOffset(long index) {
- return (int) index & mask;
- }
-
- void soElement(int offset, E value) {
- lazySet(offset, value);
- }
-
- E lvElement(int offset) {
- return get(offset);
- }
-
- @Override
- public boolean contains(Object o) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator iterator() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object[] toArray() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public R[] toArray(R[] a) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean remove(Object o) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean containsAll(Collection> c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean addAll(Collection extends E> c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean removeAll(Collection> c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean retainAll(Collection> c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean add(E e) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public E remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public E element() {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscLinkedArrayQueue.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscLinkedArrayQueue.java
deleted file mode 100644
index 99d6a6aaf..000000000
--- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscLinkedArrayQueue.java
+++ /dev/null
@@ -1,288 +0,0 @@
-package io.smallrye.mutiny.helpers.queues;
-
-import java.util.AbstractQueue;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-
-/**
- * A single-producer single-consumer array-backed queue which can allocate new arrays in case the consumer is slower
- * than the producer.
- *
- * Code inspired from https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic,
- * and it's RX Java 2 version.
- *
- * @param the element type of the queue
- */
-public final class SpscLinkedArrayQueue extends AbstractQueue implements Queue {
- private static final int MAX_LOOK_AHEAD_STEP = 4096;
- private final AtomicLong producerIndex = new AtomicLong();
-
- private int producerLookAheadStep;
- private long producerLookAhead;
-
- private final int producerMask;
-
- private AtomicReferenceArray