Skip to content

Commit

Permalink
First batch of Franz-ification
Browse files Browse the repository at this point in the history
  • Loading branch information
jponge committed Oct 19, 2023
1 parent 02a427e commit bb22659
Showing 1 changed file with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import java.util.function.Supplier;

import org.jctools.queues.atomic.MpscAtomicArrayQueue;
import org.jctools.queues.atomic.MpscLinkedAtomicQueue;
import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue;
import org.jctools.queues.atomic.SpscAtomicArrayQueue;
import org.jctools.queues.atomic.SpscChunkedAtomicArrayQueue;
import org.jctools.queues.atomic.SpscUnboundedAtomicArrayQueue;
import org.jctools.queues.unpadded.MpscLinkedUnpaddedQueue;
import org.jctools.queues.unpadded.MpscUnboundedUnpaddedArrayQueue;
import org.jctools.queues.unpadded.MpscUnpaddedArrayQueue;
import org.jctools.queues.unpadded.SpscChunkedUnpaddedArrayQueue;
import org.jctools.queues.unpadded.SpscUnboundedUnpaddedArrayQueue;
import org.jctools.queues.unpadded.SpscUnpaddedArrayQueue;

Expand All @@ -17,11 +19,6 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
public class Queues {

/**
* Queues with a requested with a capacity greater than this value are unbounded.
*/
public static final int TOO_LARGE_TO_BE_BOUNDED = 10_000_000;

private Queues() {
// avoid direct instantiation
}
Expand All @@ -42,6 +39,14 @@ public static <T> Queue<T> createSpscUnboundedArrayQueue(int size) {
}
}

public static <T> Queue<T> createSpscChunkedArrayQueue(int size) {
if (Infrastructure.useUnsafeForQueues()) {
return new SpscChunkedUnpaddedArrayQueue<>(size);
} else {
return new SpscChunkedAtomicArrayQueue<>(size);
}
}

public static <T> Supplier<Queue<T>> getXsQueueSupplier() {
return () -> createSpscArrayQueue(Infrastructure.getBufferSizeXs());
}
Expand Down Expand Up @@ -72,12 +77,7 @@ public static <T> Supplier<Queue<T>> get(int bufferSize) {
return EmptyQueue::new;
}

final int computedSize = Math.max(8, bufferSize);
if (computedSize > TOO_LARGE_TO_BE_BOUNDED) {
return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeS());
} else {
return () -> createSpscArrayQueue(computedSize);
}
return () -> createSpscChunkedArrayQueue(bufferSize);
}

/**
Expand Down Expand Up @@ -107,9 +107,9 @@ public static <T> Supplier<Queue<T>> unbounded(int size) {
*/
public static <T> Queue<T> createMpscQueue() {
if (Infrastructure.useUnsafeForQueues()) {
return new MpscLinkedUnpaddedQueue<>();
return new MpscUnboundedUnpaddedArrayQueue<>(Infrastructure.getBufferSizeS());
} else {
return new MpscLinkedAtomicQueue<>();
return new MpscUnboundedAtomicArrayQueue<>(Infrastructure.getBufferSizeS());
}
}

Expand Down

0 comments on commit bb22659

Please sign in to comment.