Skip to content

Commit

Permalink
Move queue size constants to Infrastructure class
Browse files Browse the repository at this point in the history
  • Loading branch information
jponge committed Oct 17, 2023
1 parent 94cdcd5 commit 7e91a7e
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 48 deletions.
12 changes: 12 additions & 0 deletions implementation/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@
"code": "java.class.removed",
"old": "class io.smallrye.mutiny.helpers.queues.SpscLinkedArrayQueue<E>",
"justification": "Refactoring of internal APIs"
},
{
"ignore": true,
"code": "java.field.removed",
"old": "field io.smallrye.mutiny.helpers.queues.Queues.BUFFER_S",
"justification": "Refactoring of internal APIs"
},
{
"ignore": true,
"code": "java.field.removed",
"old": "field io.smallrye.mutiny.helpers.queues.Queues.BUFFER_XS",
"justification": "Refactoring of internal APIs"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.multi.MultiConcatMapOp;
import io.smallrye.mutiny.operators.multi.MultiFlatMapOp;
Expand Down Expand Up @@ -77,7 +76,7 @@ public MultiFlatten<I, O> withRequests(int requests) {
*/
@CheckReturnValue
public Multi<O> merge() {
return merge(Queues.BUFFER_S);
return merge(Infrastructure.getBufferSizeS());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.jctools.queues.SpscArrayQueue;
import org.jctools.queues.SpscUnboundedArrayQueue;

import io.smallrye.mutiny.infrastructure.Infrastructure;

@SuppressWarnings({ "rawtypes", "unchecked" })
public class Queues {

Expand All @@ -20,20 +22,14 @@ private Queues() {
// avoid direct instantiation
}

public static final int BUFFER_XS = Math.max(8,
Integer.parseInt(System.getProperty("mutiny.buffer-size.xs", "32")));

public static final int BUFFER_S = Math.max(16,
Integer.parseInt(System.getProperty("mutiny.buffer-size.s", "256")));

static final Supplier EMPTY_QUEUE_SUPPLIER = EmptyQueue::new;
static final Supplier SINGLETON_QUEUE_SUPPLIER = SingletonQueue::new;

static final Supplier XS_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(BUFFER_XS);
static final Supplier S_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(BUFFER_S);
static final Supplier XS_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(Infrastructure.getBufferSizeXs());
static final Supplier S_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(Infrastructure.getBufferSizeS());

static final Supplier UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(BUFFER_S);
static final Supplier XS_UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(BUFFER_XS);
static final Supplier UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(Infrastructure.getBufferSizeS());
static final Supplier XS_UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(Infrastructure.getBufferSizeXs());

public static <T> Supplier<Queue<T>> getXsQueueSupplier() {
return (Supplier<Queue<T>>) XS_QUEUE_SUPPLIER;
Expand All @@ -49,11 +45,11 @@ public static <T> Supplier<Queue<T>> getXsQueueSupplier() {
* @return the supplier.
*/
public static <T> Supplier<Queue<T>> get(int bufferSize) {
if (bufferSize == BUFFER_XS) {
if (bufferSize == Infrastructure.getBufferSizeXs()) {
return XS_QUEUE_SUPPLIER;
}

if (bufferSize == BUFFER_S) {
if (bufferSize == Infrastructure.getBufferSizeS()) {
return S_QUEUE_SUPPLIER;
}

Expand Down Expand Up @@ -83,9 +79,9 @@ public static <T> Supplier<Queue<T>> get(int bufferSize) {
*/
@SuppressWarnings("unchecked")
public static <T> Supplier<Queue<T>> unbounded(int size) {
if (size == BUFFER_XS) {
if (size == Infrastructure.getBufferSizeXs()) {
return XS_UNBOUNDED_QUEUE_SUPPLIER;
} else if (size == Integer.MAX_VALUE || size == BUFFER_S) {
} else if (size == Integer.MAX_VALUE || size == Infrastructure.getBufferSizeS()) {
return UNBOUNDED_QUEUE_SUPPLIER;
} else {
return () -> new SpscUnboundedArrayQueue<>(size);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,16 @@
package io.smallrye.mutiny.infrastructure;

import static io.smallrye.mutiny.helpers.ParameterValidation.*;
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.function.*;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiOverflowStrategy;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.tuples.Functions;

Expand Down Expand Up @@ -66,12 +47,17 @@ public class Infrastructure {

private static int multiOverflowDefaultBufferSize = 128;

private static int bufferSizeXs = 32;
private static int bufferSizeS = 256;

public static void reload() {
clearInterceptors();
reloadUniInterceptors();
reloadMultiInterceptors();
reloadCallbackDecorators();
multiOverflowDefaultBufferSize = 128;
bufferSizeXs = 32;
bufferSizeS = 256;
}

/**
Expand Down Expand Up @@ -315,7 +301,7 @@ public static boolean canCallerThreadBeBlocked() {
* @param handler the handler, must not be {@code null} and must not throw an exception or it will also be lost.
*/
public static void setDroppedExceptionHandler(Consumer<Throwable> handler) {
ParameterValidation.nonNull(handler, "handler");
nonNull(handler, "handler");
droppedExceptionHandler = handler;
}

Expand Down Expand Up @@ -404,7 +390,7 @@ public static void logFromOperator(String identifier, String event, Object value
* @param operatorLogger the new operator logger
*/
public static void setOperatorLogger(OperatorLogger operatorLogger) {
Infrastructure.operatorLogger = ParameterValidation.nonNull(operatorLogger, "operatorLogger");
Infrastructure.operatorLogger = nonNull(operatorLogger, "operatorLogger");
}

// For testing purpose only
Expand All @@ -427,7 +413,53 @@ public static int getMultiOverflowDefaultBufferSize() {
* @param size the buffer size, must be strictly positive
*/
public static void setMultiOverflowDefaultBufferSize(int size) {
multiOverflowDefaultBufferSize = ParameterValidation.positive(size, "size");
multiOverflowDefaultBufferSize = positive(size, "size");
}

/**
* Get the xs buffer size (for internal usage).
*
* @return the buffer size
*/
public static int getBufferSizeXs() {
String propVal = System.getProperty("mutiny.buffer-size.xs");
if (propVal != null) {
return Math.max(8, Integer.parseInt(propVal));
} else {
return bufferSizeXs;
}
}

/**
* Set the xs buffer size (for internal usage).
*
* @param size the buffer size
*/
public static void setBufferSizeXs(int size) {
bufferSizeXs = positive(size, "size");
}

/**
* Get the s buffer size (for internal usage).
*
* @return the buffer size
*/
public static int getBufferSizeS() {
String propVal = System.getProperty("mutiny.buffer-size.s");
if (propVal != null) {
return Math.max(16, Integer.parseInt(propVal));
} else {
return bufferSizeS;
}
}

/**
* Set the xs buffer size (for internal usage).
*
* @param size the buffer size
*/
public static void setBufferSizeS(int size) {
bufferSizeS = positive(size, "size");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.BackPressureFailure;
import io.smallrye.mutiny.subscription.MultiSubscriber;

Expand All @@ -26,7 +27,7 @@
public class MultiEmitOnOp<T> extends AbstractMultiOperator<T, T> {

private final Executor executor;
private final Supplier<? extends Queue<T>> queueSupplier = Queues.get(Queues.BUFFER_S);
private final Supplier<? extends Queue<T>> queueSupplier = Queues.get(Infrastructure.getBufferSizeS());

public MultiEmitOnOp(Multi<? extends T> upstream, Executor executor) {
super(upstream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public MultiGroupByProcessor(MultiSubscriber<? super GroupedMulti<K, V>> downstr
this.keySelector = keySelector;
this.valueSelector = valueSelector;
this.groups = groups;
this.queue = Queues.<GroupedMulti<K, V>> unbounded(Queues.BUFFER_S).get();
this.queue = Queues.<GroupedMulti<K, V>> unbounded(Infrastructure.getBufferSizeS()).get();
}

@Override
Expand Down Expand Up @@ -314,7 +314,7 @@ private static final class State<T, K> implements Flow.Subscription, Flow.Publis
@SuppressWarnings("unchecked")
State(MultiGroupByProcessor<?, K, T> parent, K key) {
this.parent = parent;
this.queue = (Queue<T>) Queues.unbounded(Queues.BUFFER_S).get();
this.queue = (Queue<T>) Queues.unbounded(Infrastructure.getBufferSizeS()).get();
this.key = key;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public MultiWindowOp(Multi<? extends T> upstream,
super(upstream);
this.size = ParameterValidation.positive(size, "size");
this.skip = ParameterValidation.positive(skip, "skip");
this.processorQueueSupplier = Queues.unbounded(Queues.BUFFER_XS);
this.overflowQueueSupplier = Queues.unbounded(Queues.BUFFER_XS);
this.processorQueueSupplier = Queues.unbounded(Infrastructure.getBufferSizeXs());
this.overflowQueueSupplier = Queues.unbounded(Infrastructure.getBufferSizeXs());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.BackPressureFailure;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class UnicastProcessor<T> extends AbstractMulti<T> implements Processor<T
* @return the unicast processor
*/
public static <I> UnicastProcessor<I> create() {
return new UnicastProcessor<>(Queues.<I> unbounded(Queues.BUFFER_S).get(), null);
return new UnicastProcessor<>(Queues.<I> unbounded(Infrastructure.getBufferSizeS()).get(), null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.mutiny.infrastructure;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertThrows;

import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -40,4 +41,49 @@ void acceptCorrectMultiOverflowBufferSizes() {
Infrastructure.setMultiOverflowDefaultBufferSize(256);
assertThat(Infrastructure.getMultiOverflowDefaultBufferSize()).isEqualTo(256);
}

@Test
@DisplayName("Buffer sizes definitions when there are no matching system properties")
void bufferSizesNoSysProp() {
assertThat(Infrastructure.getBufferSizeXs()).isEqualTo(32);
assertThat(Infrastructure.getBufferSizeS()).isEqualTo(256);

Infrastructure.setBufferSizeXs(4);
assertThat(Infrastructure.getBufferSizeXs()).isEqualTo(4);

Infrastructure.setBufferSizeS(4);
assertThat(Infrastructure.getBufferSizeS()).isEqualTo(4);

assertThatThrownBy(() -> Infrastructure.setBufferSizeXs(0))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("size");

assertThatThrownBy(() -> Infrastructure.setBufferSizeXs(-1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("size");

assertThatThrownBy(() -> Infrastructure.setBufferSizeS(0))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("size");

assertThatThrownBy(() -> Infrastructure.setBufferSizeS(-1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("size");
}

@Test
@DisplayName("Buffer sizes definitions when there are matching system properties")
void bufferSizesWithSysProp() {
try {
System.setProperty("mutiny.buffer-size.s", "1024");
System.setProperty("mutiny.buffer-size.xs", "64");
assertThat(Infrastructure.getBufferSizeXs()).isEqualTo(64);
assertThat(Infrastructure.getBufferSizeS()).isEqualTo(1024);
} finally {
System.clearProperty("mutiny.buffer-size.s");
System.clearProperty("mutiny.buffer-size.xs");
}
assertThat(Infrastructure.getBufferSizeXs()).isEqualTo(32);
assertThat(Infrastructure.getBufferSizeS()).isEqualTo(256);
}
}

0 comments on commit 7e91a7e

Please sign in to comment.