Skip to content

Commit

Permalink
Avoid direct concrete instance creation
Browse files Browse the repository at this point in the history
  • Loading branch information
jponge committed Oct 17, 2023
1 parent 7e91a7e commit 16fdd09
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ public static <T> Queue<T> createMpscQueue() {
return new MpscLinkedQueue<>();
}

/**
* Creates an unbounded single producer / single consumer queue.
*
* @param size the chunk size
* @return the queue
* @param <T> the item type
*/
public static <T> Queue<T> createSpscUnboundedQueue(int size) {
return new SpscUnboundedArrayQueue<>(size);
}

/**
* Create a MPSC queue with a given size
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.jctools.queues.SpscUnboundedArrayQueue;

import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.MultiOperator;
import io.smallrye.mutiny.subscription.ContextSupport;
Expand Down Expand Up @@ -78,7 +77,7 @@ private static final class CombineLatestCoordinator<I, O> implements Subscriptio
private final MultiSubscriber<? super O> downstream;
private final Function<List<?>, ? extends O> combinator;
private final List<CombineLatestInnerSubscriber<I>> subscribers = new ArrayList<>();
private final SpscUnboundedArrayQueue<Object> queue;
private final Queue<Object> queue;
private final Object[] latest;
private final boolean delayErrors;

Expand Down Expand Up @@ -107,7 +106,7 @@ private static final class CombineLatestCoordinator<I, O> implements Subscriptio
subscribers.add(new CombineLatestInnerSubscriber<>(context, this, i, bufferSize));
}
this.latest = new Object[size];
this.queue = new SpscUnboundedArrayQueue<>(bufferSize);
this.queue = Queues.createSpscUnboundedQueue(bufferSize);
this.delayErrors = delayErrors;
}

Expand Down

0 comments on commit 16fdd09

Please sign in to comment.