diff --git a/implementation/pom.xml b/implementation/pom.xml index 4fd45f81e..854abf913 100644 --- a/implementation/pom.xml +++ b/implementation/pom.xml @@ -23,6 +23,10 @@ smallrye-common-annotation ${smallrye-common-annotation.version} + + org.jctools + jctools-core + io.reactivex.rxjava3 diff --git a/implementation/revapi.json b/implementation/revapi.json index d37de1ac5..22eef910e 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -51,7 +51,56 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "ignore": true, + "code": "java.class.removed", + "old": "class io.smallrye.mutiny.operators.multi.MultiConcatMapOp.ConcatMapMainSubscriber", + "justification": "Internal API refactoring" + }, + { + "ignore": true, + "code": "java.method.removed", + "old": "method java.util.Queue io.smallrye.mutiny.helpers.queues.Queues::createStrictSizeQueue(int)", + "justification": "Refactoring of internal APIs" + }, + { + "ignore": true, + "code": "java.field.removedWithConstant", + "old": "field io.smallrye.mutiny.helpers.queues.Queues.TO_LARGE_TO_BE_BOUNDED", + "justification": "Typo (internal API)" + }, + { + "ignore": true, + "code": "java.class.removed", + "old": "class io.smallrye.mutiny.helpers.queues.MpscLinkedQueue", + "justification": "Refactoring of internal APIs" + }, + { + "ignore": true, + "code": "java.class.removed", + "old": "class io.smallrye.mutiny.helpers.queues.SpscArrayQueue", + "justification": "Refactoring of internal APIs" + }, + { + "ignore": true, + "code": "java.class.removed", + "old": "class io.smallrye.mutiny.helpers.queues.SpscLinkedArrayQueue", + "justification": "Refactoring of internal APIs" + }, + { + "ignore": true, + "code": "java.field.removed", + "old": "field io.smallrye.mutiny.helpers.queues.Queues.BUFFER_S", + "justification": "Refactoring of internal APIs" + }, + { + "ignore": true, + "code": "java.field.removed", + "old": "field io.smallrye.mutiny.helpers.queues.Queues.BUFFER_XS", + "justification": "Refactoring of internal APIs" + } + ] } }, { "extension" : "revapi.reporter.json", diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiFlatten.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiFlatten.java index 627a255ff..f97a86529 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiFlatten.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiFlatten.java @@ -8,7 +8,6 @@ import io.smallrye.common.annotation.CheckReturnValue; import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.helpers.queues.Queues; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.multi.MultiConcatMapOp; import io.smallrye.mutiny.operators.multi.MultiFlatMapOp; @@ -77,7 +76,7 @@ public MultiFlatten withRequests(int requests) { */ @CheckReturnValue public Multi merge() { - return merge(Queues.BUFFER_S); + return merge(Infrastructure.getBufferSizeS()); } /** diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/MpscLinkedQueue.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/MpscLinkedQueue.java deleted file mode 100644 index 936649902..000000000 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/MpscLinkedQueue.java +++ /dev/null @@ -1,242 +0,0 @@ -package io.smallrye.mutiny.helpers.queues; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicReference; - -/** - * A multi-producer single consumer unbounded queue. - * Code from RX Java 2. - * - * @param the contained value type - */ -public final class MpscLinkedQueue implements Queue { - private final AtomicReference> producerNode; - private final AtomicReference> consumerNode; - - public MpscLinkedQueue() { - producerNode = new AtomicReference<>(); - consumerNode = new AtomicReference<>(); - LinkedQueueNode node = new LinkedQueueNode<>(); - spConsumerNode(node); - xchgProducerNode(node); // this ensures correct construction: StoreLoad - } - - @Override - public boolean add(T t) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean remove(Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean containsAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean addAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean removeAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean retainAll(Collection c) { - throw new UnsupportedOperationException(); - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * Offer is allowed from multiple threads.
- * Offer allocates a new node and: - *

    - *
  1. Swaps it atomically with current producer node (only one producer 'wins') - *
  2. Sets the new node as the node following from the swapped producer node - *
- * This works because each producer is guaranteed to 'plant' a new node and link the old node. No 2 producers can - * get the same producer node as part of XCHG guarantee. - * - * @see java.util.Queue#offer(Object) - */ - @Override - public boolean offer(final T e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - final LinkedQueueNode nextNode = new LinkedQueueNode<>(e); - final LinkedQueueNode prevProducerNode = xchgProducerNode(nextNode); - // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed - // and completes the store in prev.next. - prevProducerNode.soNext(nextNode); // StoreStore - return true; - } - - @Override - public T remove() { - throw new UnsupportedOperationException(); - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * Poll is allowed from a SINGLE thread.
- * Poll reads the next node from the consumerNode and: - *

    - *
  1. If it is null, the queue is assumed empty (though it might not be). - *
  2. If it is not null set it as the consumer node and return it's now evacuated value. - *
- * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null - * values are not allowed to be offered this is the only node with it's value set to null at any one time. - * - * @see java.util.Queue#poll() - */ - @Override - public T poll() { - LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright - LinkedQueueNode nextNode = currConsumerNode.lvNext(); - if (nextNode != null) { - // we have to null out the value because we are going to hang on to the node - final T nextValue = nextNode.getAndNullValue(); - spConsumerNode(nextNode); - return nextValue; - } else if (currConsumerNode != lvProducerNode()) { - // spin, we are no longer wait free - //noinspection StatementWithEmptyBody - while ((nextNode = currConsumerNode.lvNext()) == null) { - } // got the next node... - - // we have to null out the value because we are going to hang on to the node - final T nextValue = nextNode.getAndNullValue(); - spConsumerNode(nextNode); - return nextValue; - } - return null; - } - - @Override - public T element() { - throw new UnsupportedOperationException(); - } - - @Override - public T peek() { - throw new UnsupportedOperationException(); - } - - @Override - public void clear() { - //noinspection StatementWithEmptyBody - while (poll() != null && !isEmpty()) { - } - } - - LinkedQueueNode lvProducerNode() { - return producerNode.get(); - } - - LinkedQueueNode xchgProducerNode(LinkedQueueNode node) { - return producerNode.getAndSet(node); - } - - LinkedQueueNode lvConsumerNode() { - return consumerNode.get(); - } - - LinkedQueueNode lpConsumerNode() { - return consumerNode.get(); - } - - void spConsumerNode(LinkedQueueNode node) { - consumerNode.lazySet(node); - } - - @Override - public int size() { - throw new UnsupportedOperationException(); - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe - * the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to - * be null. - */ - @Override - public boolean isEmpty() { - return lvConsumerNode() == lvProducerNode(); - } - - @Override - public boolean contains(Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - - @Override - public Object[] toArray() { - throw new UnsupportedOperationException(); - } - - @Override - public T1[] toArray(T1[] a) { - throw new UnsupportedOperationException(); - } - - static final class LinkedQueueNode extends AtomicReference> { - - private static final long serialVersionUID = 2404266111789071508L; - - private E value; - - LinkedQueueNode() { - } - - LinkedQueueNode(E val) { - spValue(val); - } - - /** - * Gets the current value and nulls out the reference to it from this node. - * - * @return value - */ - public E getAndNullValue() { - E temp = lpValue(); - spValue(null); - return temp; - } - - public E lpValue() { - return value; - } - - public void spValue(E newValue) { - value = newValue; - } - - public void soNext(LinkedQueueNode n) { - lazySet(n); - } - - public LinkedQueueNode lvNext() { - return get(); - } - } -} diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java index 1d5c121ce..792bee2ee 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java @@ -1,38 +1,36 @@ package io.smallrye.mutiny.helpers.queues; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; import java.util.function.Supplier; -@SuppressWarnings({ "rawtypes", "unchecked" }) -public class Queues { +import org.jctools.queues.unpadded.MpscUnboundedUnpaddedArrayQueue; +import org.jctools.queues.unpadded.MpscUnpaddedArrayQueue; +import org.jctools.queues.unpadded.SpscChunkedUnpaddedArrayQueue; +import org.jctools.queues.unpadded.SpscUnboundedUnpaddedArrayQueue; +import org.jctools.queues.unpadded.SpscUnpaddedArrayQueue; - /** - * Queues with a requested with a capacity greater than this value are unbounded. - */ - public static final int TO_LARGE_TO_BE_BOUNDED = 10_000_000; +import io.smallrye.mutiny.infrastructure.Infrastructure; + +public class Queues { private Queues() { // avoid direct instantiation } - public static final int BUFFER_XS = Math.max(8, - Integer.parseInt(System.getProperty("mutiny.buffer-size.xs", "32"))); - - public static final int BUFFER_S = Math.max(16, - Integer.parseInt(System.getProperty("mutiny.buffer-size.s", "256"))); - - static final Supplier EMPTY_QUEUE_SUPPLIER = EmptyQueue::new; - static final Supplier SINGLETON_QUEUE_SUPPLIER = SingletonQueue::new; + public static Queue createSpscArrayQueue(int capacity) { + return new SpscUnpaddedArrayQueue<>(capacity); + } - static final Supplier XS_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(BUFFER_XS); - static final Supplier S_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(BUFFER_S); + public static Queue createSpscUnboundedArrayQueue(int chunkSize) { + return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize); + } - static final Supplier UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscLinkedArrayQueue<>(BUFFER_S); - static final Supplier XS_UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscLinkedArrayQueue<>(BUFFER_XS); + public static Queue createSpscChunkedArrayQueue(int capacity) { + return new SpscChunkedUnpaddedArrayQueue<>(capacity); + } public static Supplier> getXsQueueSupplier() { - return (Supplier>) XS_QUEUE_SUPPLIER; + return () -> createSpscArrayQueue(Infrastructure.getBufferSizeXs()); } /** @@ -40,51 +38,45 @@ public static Supplier> getXsQueueSupplier() { *

* The type of the queue and configuration is computed based on the given buffer size. * - * @param bufferSize the buffer size + * @param capacity the buffer size * @param the type of element * @return the supplier. */ - public static Supplier> get(int bufferSize) { - if (bufferSize == BUFFER_XS) { - return XS_QUEUE_SUPPLIER; + public static Supplier> get(int capacity) { + if (capacity == Infrastructure.getBufferSizeXs()) { + return () -> createSpscArrayQueue(Infrastructure.getBufferSizeXs()); } - if (bufferSize == BUFFER_S) { - return S_QUEUE_SUPPLIER; + if (capacity == Infrastructure.getBufferSizeS()) { + return () -> createSpscArrayQueue(Infrastructure.getBufferSizeS()); } - if (bufferSize == 1) { - return SINGLETON_QUEUE_SUPPLIER; + if (capacity == 1) { + return SingletonQueue::new; } - if (bufferSize == 0) { - return EMPTY_QUEUE_SUPPLIER; + if (capacity == 0) { + return EmptyQueue::new; } - final int computedSize = Math.max(8, bufferSize); - if (computedSize > TO_LARGE_TO_BE_BOUNDED) { - return UNBOUNDED_QUEUE_SUPPLIER; - } else { - return () -> new SpscArrayQueue<>(computedSize); - } + return () -> createSpscChunkedArrayQueue(capacity); } /** * Returns an unbounded Queue. * The queue is array-backed. Each array has the given size. If the queue is full, new arrays can be allocated. * - * @param size the size of the array + * @param chunkSize the size of the array * @param the type of item * @return the unbound queue supplier */ - @SuppressWarnings("unchecked") - public static Supplier> unbounded(int size) { - if (size == BUFFER_XS) { - return XS_UNBOUNDED_QUEUE_SUPPLIER; - } else if (size == Integer.MAX_VALUE || size == BUFFER_S) { - return UNBOUNDED_QUEUE_SUPPLIER; + public static Supplier> unbounded(int chunkSize) { + if (chunkSize == Infrastructure.getBufferSizeXs()) { + return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeXs()); + } else if (chunkSize == Integer.MAX_VALUE || chunkSize == Infrastructure.getBufferSizeS()) { + return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeS()); } else { - return () -> new SpscLinkedArrayQueue<>(size); + return () -> createSpscUnboundedArrayQueue(chunkSize); } } @@ -95,17 +87,28 @@ public static Supplier> unbounded(int size) { * @return the queue */ public static Queue createMpscQueue() { - return new MpscLinkedQueue<>(); + return new MpscUnboundedUnpaddedArrayQueue<>(Infrastructure.getBufferSizeS()); + } + + /** + * Creates an unbounded single producer / single consumer queue. + * + * @param chunkSize the chunk size + * @param the item type + * @return the queue + */ + public static Queue createSpscUnboundedQueue(int chunkSize) { + return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize); } /** - * Create a queue of a strict fixed size. + * Create a MPSC queue with a given size * - * @param size the queue size + * @param capacity the queue size, will be rounded * @param the elements type * @return a new queue */ - public static Queue createStrictSizeQueue(int size) { - return new ArrayBlockingQueue<>(size); + public static Queue createMpscArrayQueue(int capacity) { + return new MpscUnpaddedArrayQueue<>(capacity); } } diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscArrayQueue.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscArrayQueue.java deleted file mode 100755 index f2153ccb9..000000000 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscArrayQueue.java +++ /dev/null @@ -1,192 +0,0 @@ -package io.smallrye.mutiny.helpers.queues; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReferenceArray; - -/** - * A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer. - *

- * Code inspired from https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic, - * and it's RX Java 2 version. - * - * @param the element type of the queue - */ -public final class SpscArrayQueue extends AtomicReferenceArray implements Queue { - private static final Integer MAX_LOOK_AHEAD_STEP = 4096; - private final int mask; - private final AtomicLong producerIndex = new AtomicLong(); - private long producerLookAhead; - private final AtomicLong consumerIndex = new AtomicLong(); - private final int lookAheadStep; - - public SpscArrayQueue(int capacity) { - super(roundToPowerOfTwo(capacity)); - this.mask = length() - 1; - this.lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP); - } - - /** - * Find the next larger positive power of two value up from the given value. If value is a power of two then - * this value will be returned. - * - * @param value from which next positive power of two will be found. - * @return the next positive power of 2 or this value if it is a power of 2. - */ - public static int roundToPowerOfTwo(final int value) { - return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); - } - - @Override - public boolean offer(E e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - // local load of field to avoid repeated loads after volatile reads - final int mask = this.mask; - final long index = producerIndex.get(); - final int offset = calcElementOffset(index, mask); - if (index >= producerLookAhead) { - int step = lookAheadStep; - if (null == lvElement(calcElementOffset(index + step, mask))) { // LoadLoad - producerLookAhead = index + step; - } else if (null != lvElement(offset)) { - return false; - } - } - soElement(offset, e); // StoreStore - soProducerIndex(index + 1); // ordered store -> atomic and ordered for size() - return true; - } - - @Override - public E poll() { - final long index = consumerIndex.get(); - final int offset = calcElementOffset(index); - // local load of field to avoid repeated loads after volatile reads - final E e = lvElement(offset); // LoadLoad - if (null == e) { - return null; - } - soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size() - soElement(offset, null); // StoreStore - return e; - } - - @Override - public int size() { - long ci = consumerIndex.get(); - for (;;) { - long pi = producerIndex.get(); - long ci2 = consumerIndex.get(); - if (ci == ci2) { - return (int) (pi - ci); - } - ci = ci2; - } - } - - public E peek() { - int offset = (int) consumerIndex.get() & mask; - return get(offset); - } - - @Override - public boolean isEmpty() { - return producerIndex.get() == consumerIndex.get(); - } - - void soProducerIndex(long newIndex) { - producerIndex.lazySet(newIndex); - } - - void soConsumerIndex(long newIndex) { - consumerIndex.lazySet(newIndex); - } - - @Override - public void clear() { - // we have to test isEmpty because of the weaker poll() guarantee - //noinspection StatementWithEmptyBody - while (poll() != null || !isEmpty()) { - } - } - - int calcElementOffset(long index, int mask) { - return (int) index & mask; - } - - int calcElementOffset(long index) { - return (int) index & mask; - } - - void soElement(int offset, E value) { - lazySet(offset, value); - } - - E lvElement(int offset) { - return get(offset); - } - - @Override - public boolean contains(Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - - @Override - public Object[] toArray() { - throw new UnsupportedOperationException(); - } - - @Override - public R[] toArray(R[] a) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean remove(Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean containsAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean addAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean removeAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean retainAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean add(E e) { - throw new UnsupportedOperationException(); - } - - @Override - public E remove() { - throw new UnsupportedOperationException(); - } - - @Override - public E element() { - throw new UnsupportedOperationException(); - } -} diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscLinkedArrayQueue.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscLinkedArrayQueue.java deleted file mode 100644 index 99d6a6aaf..000000000 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscLinkedArrayQueue.java +++ /dev/null @@ -1,288 +0,0 @@ -package io.smallrye.mutiny.helpers.queues; - -import java.util.AbstractQueue; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReferenceArray; - -/** - * A single-producer single-consumer array-backed queue which can allocate new arrays in case the consumer is slower - * than the producer. - *

- * Code inspired from https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic, - * and it's RX Java 2 version. - * - * @param the element type of the queue - */ -public final class SpscLinkedArrayQueue extends AbstractQueue implements Queue { - private static final int MAX_LOOK_AHEAD_STEP = 4096; - private final AtomicLong producerIndex = new AtomicLong(); - - private int producerLookAheadStep; - private long producerLookAhead; - - private final int producerMask; - - private AtomicReferenceArray producerBuffer; - private final int consumerMask; - private AtomicReferenceArray consumerBuffer; - private final AtomicLong consumerIndex = new AtomicLong(); - - private static final Object HAS_NEXT = new Object(); - - public SpscLinkedArrayQueue(final int bufferSize) { - int p2capacity = SpscArrayQueue.roundToPowerOfTwo(Math.max(8, bufferSize)); - int mask = p2capacity - 1; - AtomicReferenceArray buffer = new AtomicReferenceArray<>(p2capacity + 1); - producerBuffer = buffer; - producerMask = mask; - adjustLookAheadStep(p2capacity); - consumerBuffer = buffer; - consumerMask = mask; - producerLookAhead = mask - 1L; // we know it's all empty to start with - soProducerIndex(0L); - } - - /** - * {@inheritDoc} - *

- * This implementation is correct for single producer thread use only. - */ - @Override - public boolean offer(final E e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - // local load of field to avoid repeated loads after volatile reads - final AtomicReferenceArray buffer = producerBuffer; - final long index = lpProducerIndex(); - final int mask = producerMask; - final int offset = calcWrappedOffset(index, mask); - if (index < producerLookAhead) { - return writeToQueue(buffer, e, index, offset); - } else { - final int lookAheadStep = producerLookAheadStep; - // go around the buffer or resize if full (unless we hit max capacity) - int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask); - if (null == lvElement(buffer, lookAheadElementOffset)) { // LoadLoad - producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room - return writeToQueue(buffer, e, index, offset); - } else if (null == lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full - return writeToQueue(buffer, e, index, offset); - } else { - resize(buffer, index, offset, e, mask); // add a buffer and link old to new - return true; - } - } - } - - private boolean writeToQueue(final AtomicReferenceArray buffer, final E e, final long index, - final int offset) { - soElement(buffer, offset, e); // StoreStore - soProducerIndex(index + 1); // this ensures atomic write of long on 32bit platforms - return true; - } - - private void resize(final AtomicReferenceArray oldBuffer, final long currIndex, final int offset, final E e, - final long mask) { - final int capacity = oldBuffer.length(); - final AtomicReferenceArray newBuffer = new AtomicReferenceArray<>(capacity); - producerBuffer = newBuffer; - producerLookAhead = currIndex + mask - 1; - soElement(newBuffer, offset, e); // StoreStore - soNext(oldBuffer, newBuffer); - soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is - // inserted - soProducerIndex(currIndex + 1); // this ensures correctness on 32bit platforms - } - - private void soNext(AtomicReferenceArray curr, AtomicReferenceArray next) { - soElement(curr, calcDirectOffset(curr.length() - 1), next); - } - - @SuppressWarnings("unchecked") - private AtomicReferenceArray lvNextBufferAndUnlink(AtomicReferenceArray curr, int nextIndex) { - int nextOffset = calcDirectOffset(nextIndex); - AtomicReferenceArray nextBuffer = (AtomicReferenceArray) lvElement(curr, nextOffset); - soElement(curr, nextOffset, null); // Avoid GC nepotism - return nextBuffer; - } - - /** - * {@inheritDoc} - *

- * This implementation is correct for single consumer thread use only. - */ - @SuppressWarnings("unchecked") - @Override - public E poll() { - // local load of field to avoid repeated loads after volatile reads - final AtomicReferenceArray buffer = consumerBuffer; - final long index = lpConsumerIndex(); - final int mask = consumerMask; - final int offset = calcWrappedOffset(index, mask); - final Object e = lvElement(buffer, offset); // LoadLoad - boolean isNextBuffer = e == HAS_NEXT; - if (null != e && !isNextBuffer) { - soElement(buffer, offset, null); // StoreStore - soConsumerIndex(index + 1); // this ensures correctness on 32bit platforms - return (E) e; - } else if (isNextBuffer) { - return newBufferPoll(lvNextBufferAndUnlink(buffer, mask + 1), index, mask); - } - - return null; - } - - @SuppressWarnings("unchecked") - private E newBufferPoll(AtomicReferenceArray nextBuffer, final long index, final int mask) { - consumerBuffer = nextBuffer; - final int offsetInNew = calcWrappedOffset(index, mask); - final E n = (E) lvElement(nextBuffer, offsetInNew); // LoadLoad - if (null != n) { - soElement(nextBuffer, offsetInNew, null); // StoreStore - soConsumerIndex(index + 1); // this ensures correctness on 32bit platforms - } - return n; - } - - @SuppressWarnings("unchecked") - public E peek() { - final AtomicReferenceArray buffer = consumerBuffer; - final long index = lpConsumerIndex(); - final int mask = consumerMask; - final int offset = calcWrappedOffset(index, mask); - final Object e = lvElement(buffer, offset); // LoadLoad - if (e == HAS_NEXT) { - return newBufferPeek(lvNextBufferAndUnlink(buffer, mask + 1), index, mask); - } - - return (E) e; - } - - @SuppressWarnings("unchecked") - private E newBufferPeek(AtomicReferenceArray nextBuffer, final long index, final int mask) { - consumerBuffer = nextBuffer; - final int offsetInNew = calcWrappedOffset(index, mask); - return (E) lvElement(nextBuffer, offsetInNew); // LoadLoad - } - - @Override - public void clear() { - //noinspection StatementWithEmptyBody - while (poll() != null || !isEmpty()) { - } - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - - public int size() { - /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer and - * consumer indices, therefore protection is required to ensure size is within valid range. In the - * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer - * index BEFORE the producer index. - */ - long after = lvConsumerIndex(); - while (true) { - final long before = after; - final long currentProducerIndex = lvProducerIndex(); - after = lvConsumerIndex(); - if (before == after) { - return (int) (currentProducerIndex - after); - } - } - } - - @Override - public boolean isEmpty() { - return lvProducerIndex() == lvConsumerIndex(); - } - - private void adjustLookAheadStep(int capacity) { - producerLookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP); - } - - private long lvProducerIndex() { - return producerIndex.get(); - } - - private long lvConsumerIndex() { - return consumerIndex.get(); - } - - private long lpProducerIndex() { - return producerIndex.get(); - } - - private long lpConsumerIndex() { - return consumerIndex.get(); - } - - private void soProducerIndex(long v) { - producerIndex.lazySet(v); - } - - private void soConsumerIndex(long v) { - consumerIndex.lazySet(v); - } - - private static int calcWrappedOffset(long index, int mask) { - return calcDirectOffset((int) index & mask); - } - - private static int calcDirectOffset(int index) { - return index; - } - - private static void soElement(AtomicReferenceArray buffer, int offset, Object e) { - buffer.lazySet(offset, e); - } - - private static Object lvElement(AtomicReferenceArray buffer, int offset) { - return buffer.get(offset); - } - - /** - * Offer two elements at the same time. - *

- * Don't use the regular offer() with this at all! - * - * @param first the first value, not null - * @param second the second value, not null - * @return true if the queue accepted the two new values - */ - public boolean offer(E first, E second) { - final AtomicReferenceArray buffer = producerBuffer; - final long p = lvProducerIndex(); - final int m = producerMask; - - int pi = calcWrappedOffset(p + 2, m); - - if (null == lvElement(buffer, pi)) { - pi = calcWrappedOffset(p, m); - soElement(buffer, pi + 1, second); - soElement(buffer, pi, first); - soProducerIndex(p + 2); - } else { - final int capacity = buffer.length(); - final AtomicReferenceArray newBuffer = new AtomicReferenceArray<>(capacity); - producerBuffer = newBuffer; - - pi = calcWrappedOffset(p, m); - soElement(newBuffer, pi + 1, second); // StoreStore - soElement(newBuffer, pi, first); - soNext(buffer, newBuffer); - - soElement(buffer, pi, HAS_NEXT); // new buffer is visible after element is - - soProducerIndex(p + 2); // this ensures correctness on 32bit platforms - } - - return true; - } -} diff --git a/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java b/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java index 367c900f5..fc3ec5a13 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java +++ b/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java @@ -1,6 +1,7 @@ package io.smallrye.mutiny.infrastructure; import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; +import static io.smallrye.mutiny.helpers.ParameterValidation.positive; import java.util.ArrayList; import java.util.Comparator; @@ -29,7 +30,6 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.groups.MultiOverflowStrategy; -import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.tuples.Functions; @@ -66,12 +66,17 @@ public class Infrastructure { private static int multiOverflowDefaultBufferSize = 128; + private static int bufferSizeXs = 32; + private static int bufferSizeS = 256; + public static void reload() { clearInterceptors(); reloadUniInterceptors(); reloadMultiInterceptors(); reloadCallbackDecorators(); multiOverflowDefaultBufferSize = 128; + bufferSizeXs = 32; + bufferSizeS = 256; } /** @@ -315,7 +320,7 @@ public static boolean canCallerThreadBeBlocked() { * @param handler the handler, must not be {@code null} and must not throw an exception or it will also be lost. */ public static void setDroppedExceptionHandler(Consumer handler) { - ParameterValidation.nonNull(handler, "handler"); + nonNull(handler, "handler"); droppedExceptionHandler = handler; } @@ -404,7 +409,7 @@ public static void logFromOperator(String identifier, String event, Object value * @param operatorLogger the new operator logger */ public static void setOperatorLogger(OperatorLogger operatorLogger) { - Infrastructure.operatorLogger = ParameterValidation.nonNull(operatorLogger, "operatorLogger"); + Infrastructure.operatorLogger = nonNull(operatorLogger, "operatorLogger"); } // For testing purpose only @@ -427,7 +432,53 @@ public static int getMultiOverflowDefaultBufferSize() { * @param size the buffer size, must be strictly positive */ public static void setMultiOverflowDefaultBufferSize(int size) { - multiOverflowDefaultBufferSize = ParameterValidation.positive(size, "size"); + multiOverflowDefaultBufferSize = positive(size, "size"); + } + + /** + * Get the xs buffer size (for internal usage). + * + * @return the buffer size + */ + public static int getBufferSizeXs() { + String propVal = System.getProperty("mutiny.buffer-size.xs"); + if (propVal != null) { + return Math.max(8, Integer.parseInt(propVal)); + } else { + return bufferSizeXs; + } + } + + /** + * Set the xs buffer size (for internal usage). + * + * @param size the buffer size + */ + public static void setBufferSizeXs(int size) { + bufferSizeXs = positive(size, "size"); + } + + /** + * Get the s buffer size (for internal usage). + * + * @return the buffer size + */ + public static int getBufferSizeS() { + String propVal = System.getProperty("mutiny.buffer-size.s"); + if (propVal != null) { + return Math.max(16, Integer.parseInt(propVal)); + } else { + return bufferSizeS; + } + } + + /** + * Set the xs buffer size (for internal usage). + * + * @param size the buffer size + */ + public static void setBufferSizeS(int size) { + bufferSizeS = positive(size, "size"); } /** diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCombineLatestOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCombineLatestOp.java index 5a9fccba9..d3eb36116 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCombineLatestOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCombineLatestOp.java @@ -2,7 +2,12 @@ import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Queue; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicInteger; @@ -13,7 +18,7 @@ import io.smallrye.mutiny.Context; import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.helpers.Subscriptions; -import io.smallrye.mutiny.helpers.queues.SpscLinkedArrayQueue; +import io.smallrye.mutiny.helpers.queues.Queues; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.MultiOperator; import io.smallrye.mutiny.subscription.ContextSupport; @@ -77,7 +82,7 @@ private static final class CombineLatestCoordinator implements Subscriptio private final MultiSubscriber downstream; private final Function, ? extends O> combinator; private final List> subscribers = new ArrayList<>(); - private final SpscLinkedArrayQueue queue; + private final Queue queue; private final Object[] latest; private final boolean delayErrors; @@ -106,7 +111,7 @@ private static final class CombineLatestCoordinator implements Subscriptio subscribers.add(new CombineLatestInnerSubscriber<>(context, this, i, bufferSize)); } this.latest = new Object[size]; - this.queue = new SpscLinkedArrayQueue<>(bufferSize); + this.queue = Queues.createSpscUnboundedQueue(bufferSize); this.delayErrors = delayErrors; } @@ -148,7 +153,8 @@ void innerValue(int index, I value) { } os[index] = value; if (os.length == localNonEmptySources) { - queue.offer(subscribers.get(index), os.clone()); + queue.offer(subscribers.get(index)); + queue.offer(os.clone()); replenishInsteadOfDrain = false; } else { replenishInsteadOfDrain = true; @@ -196,7 +202,7 @@ void innerError(int index, Throwable e) { @SuppressWarnings("unchecked") void drainAsync() { - final SpscLinkedArrayQueue q = queue; + final Queue q = queue; int missed = 1; diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java index 500ef766c..112b5c986 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java @@ -15,6 +15,7 @@ import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.helpers.queues.Queues; +import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.subscription.BackPressureFailure; import io.smallrye.mutiny.subscription.MultiSubscriber; @@ -26,7 +27,7 @@ public class MultiEmitOnOp extends AbstractMultiOperator { private final Executor executor; - private final Supplier> queueSupplier = Queues.get(Queues.BUFFER_S); + private final Supplier> queueSupplier = Queues.get(Infrastructure.getBufferSizeS()); public MultiEmitOnOp(Multi upstream, Executor executor) { super(upstream); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java index 7088932e8..7810a99c0 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java @@ -69,7 +69,7 @@ public MultiGroupByProcessor(MultiSubscriber> downstr this.keySelector = keySelector; this.valueSelector = valueSelector; this.groups = groups; - this.queue = Queues.> unbounded(Queues.BUFFER_S).get(); + this.queue = Queues.> unbounded(Infrastructure.getBufferSizeS()).get(); } @Override @@ -314,7 +314,7 @@ private static final class State implements Flow.Subscription, Flow.Publis @SuppressWarnings("unchecked") State(MultiGroupByProcessor parent, K key) { this.parent = parent; - this.queue = (Queue) Queues.unbounded(Queues.BUFFER_S).get(); + this.queue = (Queue) Queues.unbounded(Infrastructure.getBufferSizeS()).get(); this.key = key; } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java index 961884a0e..60ae6d8c6 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java @@ -53,8 +53,8 @@ public MultiWindowOp(Multi upstream, super(upstream); this.size = ParameterValidation.positive(size, "size"); this.skip = ParameterValidation.positive(skip, "skip"); - this.processorQueueSupplier = Queues.unbounded(Queues.BUFFER_XS); - this.overflowQueueSupplier = Queues.unbounded(Queues.BUFFER_XS); + this.processorQueueSupplier = Queues.unbounded(Infrastructure.getBufferSizeXs()); + this.overflowQueueSupplier = Queues.unbounded(Infrastructure.getBufferSizeXs()); } @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/BufferItemMultiEmitter.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/BufferItemMultiEmitter.java index f595051b3..7d5b48a2b 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/BufferItemMultiEmitter.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/BufferItemMultiEmitter.java @@ -11,13 +11,16 @@ public class BufferItemMultiEmitter extends BaseMultiEmitter { private final Queue queue; + private final int overflowBufferSize; private Throwable failure; private volatile boolean done; private final AtomicInteger wip = new AtomicInteger(); + private final AtomicInteger strictBoundCounter = new AtomicInteger(); - BufferItemMultiEmitter(MultiSubscriber actual, Queue queue) { + BufferItemMultiEmitter(MultiSubscriber actual, Queue queue, int overflowBufferSize) { super(actual); this.queue = queue; + this.overflowBufferSize = overflowBufferSize; } @Override @@ -30,7 +33,7 @@ public MultiEmitter emit(T t) { fail(new NullPointerException("`emit` called with `null`.")); return this; } - if (queue.offer(t)) { + if (queue.offer(t) && (overflowBufferSize == -1 || strictBoundCounter.incrementAndGet() < overflowBufferSize)) { drain(); } else { fail(new EmitterBufferOverflowException()); @@ -83,21 +86,20 @@ void drain() { } int missed = 1; - final Queue q = queue; do { - long r = requested.get(); - long e = 0L; + long pending = requested.get(); + long emitted = 0L; - while (e != r) { + while (emitted != pending) { if (isCancelled()) { - q.clear(); + queue.clear(); return; } boolean d = done; - T o = q.poll(); + T o = queue.poll(); boolean empty = o == null; @@ -115,23 +117,26 @@ void drain() { } try { + if (overflowBufferSize != -1) { + strictBoundCounter.decrementAndGet(); + } downstream.onItem(o); } catch (Throwable x) { cancel(); } - e++; + emitted++; } - if (e == r) { + if (emitted == pending) { if (isCancelled()) { - q.clear(); + queue.clear(); return; } boolean d = done; - boolean empty = q.isEmpty(); + boolean empty = queue.isEmpty(); if (d && empty) { if (failure != null) { @@ -143,8 +148,8 @@ void drain() { } } - if (e != 0) { - Subscriptions.produced(requested, e); + if (emitted != 0) { + Subscriptions.produced(requested, emitted); } missed = wip.addAndGet(-missed); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/EmitterBasedMulti.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/EmitterBasedMulti.java index 3015a7ff0..4deeb0b73 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/EmitterBasedMulti.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/EmitterBasedMulti.java @@ -17,16 +17,17 @@ public final class EmitterBasedMulti extends AbstractMulti { public static final int HINT = 16; private final Consumer> consumer; private final BackPressureStrategy backpressure; - private final int bufferSize; + private final int overflowBufferSize; public EmitterBasedMulti(Consumer> consumer, BackPressureStrategy backpressure) { this(consumer, backpressure, -1); } - public EmitterBasedMulti(Consumer> consumer, BackPressureStrategy backpressure, int bufferSize) { + public EmitterBasedMulti(Consumer> consumer, BackPressureStrategy backpressure, + int overflowBufferSize) { this.consumer = consumer; this.backpressure = backpressure; - this.bufferSize = bufferSize; + this.overflowBufferSize = overflowBufferSize; } @Override @@ -51,10 +52,11 @@ public void subscribe(MultiSubscriber downstream) { break; default: - if (bufferSize == -1) { - emitter = new BufferItemMultiEmitter<>(downstream, Queues. unbounded(HINT).get()); + if (overflowBufferSize == -1) { + emitter = new BufferItemMultiEmitter<>(downstream, Queues. unbounded(HINT).get(), overflowBufferSize); } else { - emitter = new BufferItemMultiEmitter<>(downstream, Queues.createStrictSizeQueue(bufferSize)); + emitter = new BufferItemMultiEmitter<>(downstream, Queues.createMpscArrayQueue(overflowBufferSize), + overflowBufferSize); } break; diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java index c4d980db0..032631e88 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java @@ -49,13 +49,14 @@ class OnOverflowBufferProcessor extends MultiOperatorProcessor { private final AtomicLong requested = new AtomicLong(); private final AtomicInteger wip = new AtomicInteger(); + private final AtomicInteger strictBoundCounter = new AtomicInteger(); volatile boolean cancelled; volatile boolean done; OnOverflowBufferProcessor(MultiSubscriber downstream, int bufferSize, boolean unbounded) { super(downstream); - this.queue = unbounded ? Queues. unbounded(bufferSize).get() : Queues.createStrictSizeQueue(bufferSize); + this.queue = unbounded ? Queues. unbounded(bufferSize).get() : Queues.createMpscArrayQueue(bufferSize); } @Override @@ -70,7 +71,7 @@ public void onSubscribe(Subscription subscription) { @Override public void onItem(T t) { - if (!queue.offer(t)) { + if ((!unbounded && strictBoundCounter.getAndIncrement() >= bufferSize) || !queue.offer(t)) { BackPressureFailure bpf = new BackPressureFailure( "The overflow buffer is full, which is due to the upstream sending too many items w.r.t. the downstream capacity and/or the downstream not consuming items fast enough"); if (dropUniMapper != null) { @@ -167,6 +168,9 @@ void drain() { if (wasEmpty) { break; } + if (!unbounded) { + strictBoundCounter.decrementAndGet(); + } downstream.onItem(item); emitted++; } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/processors/UnicastProcessor.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/processors/UnicastProcessor.java index ea2ed7b10..eb7b37a55 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/processors/UnicastProcessor.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/processors/UnicastProcessor.java @@ -12,6 +12,7 @@ import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.helpers.queues.Queues; +import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.AbstractMulti; import io.smallrye.mutiny.subscription.BackPressureFailure; import io.smallrye.mutiny.subscription.BackPressureStrategy; @@ -55,7 +56,7 @@ public class UnicastProcessor extends AbstractMulti implements Processor UnicastProcessor create() { - return new UnicastProcessor<>(Queues. unbounded(Queues.BUFFER_S).get(), null); + return new UnicastProcessor<>(Queues. unbounded(Infrastructure.getBufferSizeS()).get(), null); } /** diff --git a/implementation/src/main/java/module-info.java b/implementation/src/main/java/module-info.java index 0494bd048..15f97369d 100644 --- a/implementation/src/main/java/module-info.java +++ b/implementation/src/main/java/module-info.java @@ -1,6 +1,7 @@ open module io.smallrye.mutiny { requires transitive io.smallrye.common.annotation; + requires org.jctools.core; exports io.smallrye.mutiny; exports io.smallrye.mutiny.converters.multi; diff --git a/implementation/src/test/java/io/smallrye/mutiny/helpers/BlockingIterableTest.java b/implementation/src/test/java/io/smallrye/mutiny/helpers/BlockingIterableTest.java index dab5b4078..e71639cc0 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/helpers/BlockingIterableTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/helpers/BlockingIterableTest.java @@ -19,6 +19,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import org.jctools.queues.SpscArrayQueue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -26,7 +27,6 @@ import org.junit.jupiter.api.parallel.ResourceLock; import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.helpers.queues.SpscArrayQueue; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.AbstractMulti; import io.smallrye.mutiny.subscription.BackPressureFailure; diff --git a/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java b/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java index 49264aa2f..c52a71fc1 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java @@ -1,109 +1,20 @@ package io.smallrye.mutiny.helpers.queues; -import static io.smallrye.mutiny.helpers.queues.Queues.BUFFER_S; -import static io.smallrye.mutiny.helpers.queues.Queues.BUFFER_XS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.assertThrows; -import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; import org.junit.jupiter.api.Test; @SuppressWarnings({ "rawtypes", "unchecked", "MismatchedQueryAndUpdateOfCollection" }) public class QueuesTest { - @Test - public void testUnboundedQueueCreation() { - Queue q = Queues.unbounded(10).get(); - assertThat(q).isInstanceOf(SpscLinkedArrayQueue.class); - - q = Queues.unbounded(Queues.BUFFER_XS).get(); - assertThat(q).isInstanceOf(SpscLinkedArrayQueue.class); - - q = Queues.unbounded(Queues.BUFFER_S).get(); - assertThat(q).isInstanceOf(SpscLinkedArrayQueue.class); - - q = Queues.unbounded(Integer.MAX_VALUE).get(); - assertThat(q).isInstanceOf(SpscLinkedArrayQueue.class); - } - - @Test - public void testCreationOfBoundedQueues() { - //the bounded queue floors at 8 and rounds to the next power of 2 - Queue queue = Queues.get(2).get(); - // 8 is the minimum - assertThat(getCapacity(queue)).isEqualTo(8); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - - queue = Queues.get(8).get(); - // 8 is the minimum - assertThat(getCapacity(queue)).isEqualTo(8); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - - queue = Queues.get(10).get(); - // next power of 2 after 8 - assertThat(getCapacity(queue)).isEqualTo(16); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - - // Special BUFFER_XS case - queue = Queues.get(BUFFER_XS).get(); - assertThat(getCapacity(queue)).isEqualTo(32); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - - // Special BUFFER_S case - queue = Queues.get(BUFFER_S).get(); - assertThat(getCapacity(queue)).isEqualTo(256); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - - queue = Queues.get(1).get(); - assertThat(getCapacity(queue)).isEqualTo(1); - assertThat(queue).isInstanceOf(SingletonQueue.class); - - queue = Queues.get(0).get(); - assertThat(getCapacity(queue)).isEqualTo(0); - assertThat(queue).isInstanceOf(EmptyQueue.class); - - queue = Queues.get(4).get(); - assertThat(getCapacity(queue)).isEqualTo(8); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - } - - @Test - public void testCreationOfUnboundedQueues() { - Queue queue = Queues.get(Integer.MAX_VALUE).get(); - assertThat(getCapacity(queue)).isEqualTo(Integer.MAX_VALUE); - assertThat(queue).isInstanceOf(SpscLinkedArrayQueue.class); - - // Not large enough to be unbounded: - queue = Queues.get(1000).get(); - // Next power of 2. - assertThat(getCapacity(queue)).isEqualTo(1024L); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - - queue = Queues.get(Queues.TO_LARGE_TO_BE_BOUNDED + 1).get(); - assertThat(getCapacity(queue)).isEqualTo(Integer.MAX_VALUE); - assertThat(queue).isInstanceOf(SpscLinkedArrayQueue.class); - - } - - private long getCapacity(Queue q) { - if (q instanceof EmptyQueue) { - return 0; - } - if (q instanceof SingletonQueue) { - return 1; - } - if (q instanceof SpscLinkedArrayQueue) { - return Integer.MAX_VALUE; - } else if (q instanceof SpscArrayQueue) { - return ((SpscArrayQueue) q).length(); - } - return -1; - } - @Test public void testEmptyQueue() { Queue queue = Queues. get(0).get(); @@ -220,280 +131,4 @@ public void testSingletonQueue() { assertThat(iterator.hasNext()).isFalse(); assertThat(iterator.next()).isNull(); } - - @Test - public void testThatSpscArrayQueueCannotReceiveNull() { - assertThrows(NullPointerException.class, () -> { - SpscArrayQueue q = new SpscArrayQueue<>(16); - q.offer(null); - }); - } - - @Test - public void testThatSpscLinkedArrayQueueCannotReceiveNull() { - assertThrows(NullPointerException.class, () -> { - SpscLinkedArrayQueue q = new SpscLinkedArrayQueue<>(16); - q.offer(null); - }); - } - - @Test - public void testThatMpscLinkedQueueCannotReceiveNull() { - assertThrows(NullPointerException.class, () -> { - MpscLinkedQueue q = new MpscLinkedQueue<>(); - q.offer(null); - }); - } - - @Test - public void testSpscArrayQueueOffer() { - SpscArrayQueue q = new SpscArrayQueue<>(16); - q.offer(1); - q.offer(2); - assertThat(q.size()).isEqualTo(2); - assertThat(q.peek()).isEqualTo(1); - assertThat(q.poll()).isEqualTo(1); - assertThat(q.peek()).isEqualTo(2); - assertThat(q.poll()).isEqualTo(2); - assertThat(q.poll()).isNull(); - } - - @Test - public void testSpscLinkedArrayQueueOffer() { - SpscLinkedArrayQueue q = new SpscLinkedArrayQueue<>(16); - q.offer(1); - q.offer(2); - assertThat(q.poll()).isEqualTo(1); - assertThat(q.poll()).isEqualTo(2); - assertThat(q.poll()).isNull(); - } - - @Test - public void testSpscLinkedArrayQueueBiOffer() { - SpscLinkedArrayQueue q = new SpscLinkedArrayQueue<>(16); - q.offer(1, 2); - assertThat(q.poll()).isEqualTo(1); - assertThat(q.poll()).isEqualTo(2); - assertThat(q.poll()).isNull(); - } - - @Test - public void testMpscLinkedQueueOffer() { - MpscLinkedQueue q = new MpscLinkedQueue<>(); - assertThat(q.isEmpty()).isTrue(); - q.offer(1); - q.offer(2); - assertThat(q.isEmpty()).isFalse(); - assertThat(q.poll()).isEqualTo(1); - assertThat(q.poll()).isEqualTo(2); - assertThat(q.poll()).isNull(); - assertThat(q.isEmpty()).isTrue(); - } - - @Test - public void testSpscCapacity() { - SpscArrayQueue q = new SpscArrayQueue<>(8); - assertThat(q.offer(1)).isTrue(); - assertThat(q.offer(2)).isTrue(); - assertThat(q.offer(3)).isTrue(); - assertThat(q.offer(4)).isTrue(); - assertThat(q.offer(5)).isTrue(); - assertThat(q.offer(6)).isTrue(); - assertThat(q.offer(7)).isTrue(); - assertThat(q.offer(8)).isTrue(); - assertThat(q.size()).isEqualTo(8); - - assertThat(q.offer(9)).isFalse(); - } - - @Test - public void testSpscLinkedNewBufferPeek() { - SpscLinkedArrayQueue q = new SpscLinkedArrayQueue<>(8); - assertThat(q.offer(1)).isTrue(); - assertThat(q.offer(2)).isTrue(); - assertThat(q.offer(3)).isTrue(); - assertThat(q.offer(4)).isTrue(); - assertThat(q.offer(5)).isTrue(); - assertThat(q.offer(6)).isTrue(); - assertThat(q.offer(7)).isTrue(); - assertThat(q.offer(8)).isTrue(); - assertThat(q.offer(9)).isTrue(); - - for (int i = 0; i < 9; i++) { - assertThat(q.peek()).isEqualTo(i + 1); - assertThat(q.poll()).isEqualTo(i + 1); - } - - assertThat(q.peek()).isNull(); - assertThat(q.poll()).isNull(); - } - - @Test - public void testSpscLinkedNewBufferPeekWithBiOffer() { - SpscLinkedArrayQueue q = new SpscLinkedArrayQueue<>(8); - assertThat(q.offer(1, 2)).isTrue(); - assertThat(q.offer(3, 4)).isTrue(); - assertThat(q.size()).isEqualTo(4); - assertThat(q.offer(5, 6)).isTrue(); - assertThat(q.offer(7, 8)).isTrue(); - assertThat(q.offer(9)).isTrue(); - assertThat(q.size()).isEqualTo(9); - - for (int i = 0; i < 9; i++) { - assertThat(q.peek()).isEqualTo(i + 1); - assertThat(q.poll()).isEqualTo(i + 1); - } - - assertThat(q.peek()).isNull(); - assertThat(q.poll()).isNull(); - } - - @Test - public void testMpscOfferPollRace() throws Exception { - MpscLinkedQueue q = new MpscLinkedQueue<>(); - CountDownLatch start = new CountDownLatch(3); - - final AtomicInteger c = new AtomicInteger(3); - - Thread t1 = new Thread(new Runnable() { - int i; - - @Override - public void run() { - start.countDown(); - try { - start.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - while (i++ < 10000) { - q.offer(i); - } - } - }); - t1.start(); - - Thread t2 = new Thread(new Runnable() { - int i = 10000; - - @Override - public void run() { - start.countDown(); - try { - start.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - while (i++ < 10000) { - q.offer(i); - } - } - }); - t2.start(); - - Runnable r3 = new Runnable() { - int i = 20000; - - @Override - public void run() { - start.countDown(); - try { - start.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - while (--i > 0) { - q.poll(); - } - } - }; - r3.run(); - - t1.join(); - t2.join(); - } - - @SuppressWarnings("ResultOfMethodCallIgnored") - @Test - public void testUnsupportedAPIFromMpsc() { - MpscLinkedQueue q = new MpscLinkedQueue<>(); - q.offer(1); - q.offer(2); - - assertThatThrownBy(() -> q.add(3)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.remove(2)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.remove()) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.addAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.containsAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.contains(1)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::size) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.removeAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.retainAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::element) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::peek) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::iterator) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.toArray()) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.toArray(new Integer[0])) - .isInstanceOf(UnsupportedOperationException.class); - } - - @SuppressWarnings("ResultOfMethodCallIgnored") - @Test - public void testUnsupportedAPIFromSpscArrayQueue() { - SpscArrayQueue q = new SpscArrayQueue<>(3); - q.offer(1); - q.offer(2); - - assertThatThrownBy(() -> q.add(3)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.remove(2)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.remove()) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.addAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.containsAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.contains(1)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.removeAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.retainAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::element) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::iterator) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.toArray()) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.toArray(new Integer[0])) - .isInstanceOf(UnsupportedOperationException.class); - } - - @Test - public void testUnsupportedAPIFromSpscLinkedArrayQueue() { - SpscLinkedArrayQueue q = new SpscLinkedArrayQueue<>(5); - q.offer(1); - q.offer(2); - // Other methods are implemented by AbstractCollection. - assertThatThrownBy(q::iterator) - .isInstanceOf(UnsupportedOperationException.class); - } - } diff --git a/implementation/src/test/java/io/smallrye/mutiny/infrastructure/InfrastructureTest.java b/implementation/src/test/java/io/smallrye/mutiny/infrastructure/InfrastructureTest.java index b89e1df08..154de76b4 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/infrastructure/InfrastructureTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/infrastructure/InfrastructureTest.java @@ -1,6 +1,7 @@ package io.smallrye.mutiny.infrastructure; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertThrows; import org.junit.jupiter.api.AfterEach; @@ -40,4 +41,49 @@ void acceptCorrectMultiOverflowBufferSizes() { Infrastructure.setMultiOverflowDefaultBufferSize(256); assertThat(Infrastructure.getMultiOverflowDefaultBufferSize()).isEqualTo(256); } + + @Test + @DisplayName("Buffer sizes definitions when there are no matching system properties") + void bufferSizesNoSysProp() { + assertThat(Infrastructure.getBufferSizeXs()).isEqualTo(32); + assertThat(Infrastructure.getBufferSizeS()).isEqualTo(256); + + Infrastructure.setBufferSizeXs(4); + assertThat(Infrastructure.getBufferSizeXs()).isEqualTo(4); + + Infrastructure.setBufferSizeS(4); + assertThat(Infrastructure.getBufferSizeS()).isEqualTo(4); + + assertThatThrownBy(() -> Infrastructure.setBufferSizeXs(0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("size"); + + assertThatThrownBy(() -> Infrastructure.setBufferSizeXs(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("size"); + + assertThatThrownBy(() -> Infrastructure.setBufferSizeS(0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("size"); + + assertThatThrownBy(() -> Infrastructure.setBufferSizeS(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("size"); + } + + @Test + @DisplayName("Buffer sizes definitions when there are matching system properties") + void bufferSizesWithSysProp() { + try { + System.setProperty("mutiny.buffer-size.s", "1024"); + System.setProperty("mutiny.buffer-size.xs", "64"); + assertThat(Infrastructure.getBufferSizeXs()).isEqualTo(64); + assertThat(Infrastructure.getBufferSizeS()).isEqualTo(1024); + } finally { + System.clearProperty("mutiny.buffer-size.s"); + System.clearProperty("mutiny.buffer-size.xs"); + } + assertThat(Infrastructure.getBufferSizeXs()).isEqualTo(32); + assertThat(Infrastructure.getBufferSizeS()).isEqualTo(256); + } } diff --git a/pom.xml b/pom.xml index 4e1f05170..d2ec123f2 100644 --- a/pom.xml +++ b/pom.xml @@ -77,6 +77,7 @@ + 4.0.2 1.0.0 1.0.4 3.1.8 @@ -132,6 +133,11 @@ mutiny ${project.version} + + org.jctools + jctools-core + ${jctools-core.version} + io.smallrye.reactive mutiny-zero-flow-adapters