getQueue() {
- return _queues.get();
- }
-
- public RetryCountSlidingWindow(
- int updateIntervalMs,
- int slidingWindowLengthInSecond,
- ScheduledExecutorService updateExecutor) {
- examineInput(slidingWindowLengthInSecond, "slidingWindowLengthInSecond");
- examineInput(updateIntervalMs, "updateIntervalMs");
-
- this._slidingWindowLengthInSecond = slidingWindowLengthInSecond;
-
- LOG.info(
- "Building a RetrySlidingWindow with updateInterval = {} ms and slidingWindowLength = {} seconds",
- updateIntervalMs,
- slidingWindowLengthInSecond);
- ScheduledExecutorService updater = Objects.requireNonNull(updateExecutor, "Null updateExecutor");
- updater.scheduleWithFixedDelay(this::updateCount, 0, updateIntervalMs, TimeUnit.MILLISECONDS);
- }
-
- private void updateCount() {
- _lastUpdateTsByUpdated = RetryCounter.getCurrentSecond();
- new SimpleCount().count();
- }
-
- private class SimpleCount {
- long total;
- long retry;
-
- public void count() {
- _queueMap.values().forEach(q -> {
- q.removeOldCounters();
- total += q.getTotalCount();
- retry += q.getTotalRetryCount();
- });
-
- _retryRatio = total > 0 ? (double) retry / (double) total : 0.0;
- _totalCount = total;
- _retryCount = retry;
- }
- }
-
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/misc/TouchTimer.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/misc/TouchTimer.java
deleted file mode 100644
index f95ae2f549d..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/misc/TouchTimer.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- * $Id$
- */
-package com.linkedin.alpini.base.misc;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Formatter;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import org.apache.logging.log4j.util.StringBuilderFormattable;
-
-
-/**
- * Handy utility class to track the timing while processing a request. A class can create a TouchTimer,
- * then "touch" it at various points during processing. At the end, if the request took a long time,
- * it can log the output. For example:
- *
- *
- * void doSomething()
- * {
- * TouchTimer timer = new TouchTimer();
- * timer.touch("Calling fooService");
- * fooService.doSomething();
- *
- * timer.touch("Calling barService");
- * barService.doSomething();
- *
- * timer.touch("All Done!");
- *
- * if(timer.getElapsedTimeMillis() > 1000)
- * {
- * log.warn("doSomething took over a second to complete! " + timer);
- * }
- * }
- *
- *
- * The block of code tracks the time to call each service. If the total time spent was greater than 1 second, it logs
- * a message. The logged message will include timing for each "touch". The timer is thread safe, and output from
- * TouchTimer.toString() will include the thread name along with each touch.
- *
- * @author Jemiah Westerman <jwesterman@linkedin.com>
- * @version $Revision$
- */
-public final class TouchTimer implements StringBuilderFormattable {
- /**
- * Default maximum number of messages allowed on one timer.
- * This is a safety to prevent a timer from growing without bounds
- * if someone has it in a ThreadLocal or a static or something like that.
- */
- public static final int DEFAULT_MAX_MESSAGES;
-
- public static final String MAX_MESSAGES_PROPERTY = "com.linkedin.alpini.base.misc.TouchTimer.max";
-
- /**
- * Message used to indicate that the maximum number of messages has been reached.
- */
- private static final String MAX_MESSAGES_WARNING;
-
- private static final String DATE_FORMAT = "%1$tY/%1$tm/%1$td %1$tT.%1$tL";
-
- /**
- * Stores a singleton (deduped) copy of the thread name. The deduplication ensures that if we reference the same thread
- * name multiple times, that we only reference a single instance of that String on the heap.
- */
- private static final ThreadLocal DEDUPED_THREAD_NAME = new ThreadLocal<>();
-
- private final int _maxMessageCount;
-
- /** Non-null sentinel; _first._next points to the oldest message in the list. */
- private Message _first = new Message(0, null, null, null, null);
-
- /** Last (most recent) message in the list. */
- private volatile Message _last = _first;
-
- /** Number of messages in the list. */
- private int _messageCount;
-
- private final long _startTimeMillis;
- private final long _startTimeNanos;
-
- /**
- * Default constructor
- */
- public TouchTimer() {
- this(DEFAULT_MAX_MESSAGES);
- }
-
- /**
- * Alternative constructor
- * @param maxMessageCount Maximum number of messages
- */
- public TouchTimer(@Nonnegative int maxMessageCount) {
- this(maxMessageCount, Time.currentTimeMillis(), Time.nanoTime());
- }
-
- public TouchTimer(
- @Nonnegative long startTimeMillis,
- @Nonnegative long startTimeNanos,
- @Nonnegative int maxMessageCount) {
- this(maxMessageCount, startTimeMillis, startTimeNanos);
- addMessage(new Message(startTimeNanos, getDedupedThreadName(), "start", null, null));
- }
-
- private TouchTimer(int maxMessageCount, long startTimeMillis, long startTimeNanos) {
- _maxMessageCount = maxMessageCount;
- _startTimeMillis = startTimeMillis;
- _startTimeNanos = startTimeNanos;
- }
-
- public long getStartTimeMillis() {
- return _startTimeMillis;
- }
-
- public long getStartTimeNanos() {
- return _startTimeNanos;
- }
-
- /**
- * Add an event to this timer.
- * @param name event name
- * @param klass class generating the event
- */
- public void touch(Class> klass, String name, Object... args) {
- String threadName = getDedupedThreadName();
-
- addMessage(
- new Message(Time.nanoTime(), threadName, name, args != null && args.length > 0 ? args.clone() : null, klass));
- }
-
- /**
- * Add an event to this timer.
- * @param name event name
- */
- public void touch(Class> klass, String name) {
- String threadName = getDedupedThreadName();
-
- addMessage(new Message(Time.nanoTime(), threadName, name, null, klass));
- }
-
- /**
- * Add an event to this timer.
- * @param name event name
- */
- public void touch(String name, Object... args) {
- touch(null, name, args);
- }
-
- /**
- * Add an event to this timer.
- * @param name event name
- */
- public void touch(String name) {
- touch(null, name);
- }
-
- /**
- * Add a new message to the list in a thread-safe way without using synchronization nor locks.
- * @param message message to add
- */
- private void addMessage(@Nonnull Message message) {
- Message last = _last;
- for (;;) {
- message._count = last._count + 1;
- if (message._count > _maxMessageCount) {
- // DEFAULT_MAX_MESSAGES exceeded, discard the message
- return;
- }
- if (last._next == null && NEXT_UPDATER.compareAndSet(last, null, message)) {
- _messageCount = message._count;
- _last = message;
-
- // We exceeded DEFAULT_MAX_MESSAGES. Add a warning and don't store any more touches for this timer.
- if (message._count == _maxMessageCount) {
- message._next = new Message(
- Time.nanoTime(),
- message._threadName,
- _maxMessageCount == DEFAULT_MAX_MESSAGES ? MAX_MESSAGES_WARNING : getMaxMessagesWarning(_maxMessageCount),
- null,
- message._klass);
- }
-
- return;
- }
- last = last._next;
- }
- }
-
- private static long nanosToMillis(long nanos) {
- return TimeUnit.NANOSECONDS.toMillis(nanos);
- }
-
- /**
- * Return the total time elapsed between the first and last events.
- * @return time in milliseconds
- */
- public long getElapsedTimeMillis() {
- return nanosToMillis(getElapsedTimeNanos());
- }
-
- /**
- * Return the total time elapsed between the first and last events.
- * @return time in nanoseconds
- */
- public long getElapsedTimeNanos() {
- return _messageCount == 0 ? 0 : _last._nanos - _first._next._nanos;
- }
-
- @Override
- public String toString() {
- StringBuilder trace = new StringBuilder();
- formatTo(trace);
- return trace.toString();
- }
-
- @Override
- public void formatTo(StringBuilder trace) {
- Formatter formatter = new Formatter(trace);
-
- trace.append("Total Time: ");
- TimeFormat.formatTimespan(getElapsedTimeMillis(), trace);
- trace.append(" Trace: ");
-
- long prevMessageNanos = 0;
-
- Message current = _first._next;
- while (current != null) {
- // If this is not the first message then out the time since the previous message
- if (prevMessageNanos != 0) {
- trace.append(' ');
- TimeFormat.formatTimespan(nanosToMillis(current._nanos - prevMessageNanos), trace);
- trace.append(' ');
- }
-
- trace.append("[").append(current._threadName).append(' ');
-
- // if a class was given, then build the message name using the class name.
- if (current._klass != null) {
- String longClassName = current._klass.getName();
- int lastDot = longClassName.lastIndexOf('.');
- trace.append(longClassName.subSequence(lastDot + 1, longClassName.length())).append(':');
- }
-
- if (current._args == null) {
- trace.append(current._name);
- } else {
- formatter.format(current._name, current._args);
- }
-
- trace.append(' ');
-
- long timeStamp = _startTimeMillis + nanosToMillis(current._nanos - _startTimeNanos);
- formatter.format(DATE_FORMAT, new Date(timeStamp));
-
- trace.append(']');
-
- prevMessageNanos = current._nanos;
- current = current._next;
- }
- }
-
- /** Return the messages list for this TouchTimer. For use in unit tests. */
- /* package private */ List getMessages() {
- List messages = new ArrayList(_messageCount);
- Message current = _first._next;
- while (current != null) {
- messages.add(current);
- current = current._next;
- }
- return Collections.unmodifiableList(messages);
- }
-
- /**
- * Visit all the messages for this TouchTimer
- * @param visitor message visitor
- */
- public void forEachMessage(@Nonnull Visitor visitor) {
- Message current = _first._next;
- while (current != null) {
- long timeStamp = _startTimeMillis + nanosToMillis(current._nanos - _startTimeNanos);
- visitor.visit(
- timeStamp,
- current._threadName,
- current._klass,
- current._name,
- current._args != null ? current._args.clone() : null);
- current = current._next;
- }
- }
-
- /**
- * Return the deduplicated name of the current thread. Deduplication makes suer that multiple references to the same thread name
- * reference a single String instance.
- */
- /*package private*/ static String getDedupedThreadName() {
- // Get the de-duped thread name from the map from the ThreadLocal.
- // If the name wasn't already cached or if it the name has changed, then we have to update the ThreadLocal.
- String localThreadName = Thread.currentThread().getName();
- String dedupedThreadName = DEDUPED_THREAD_NAME.get();
- if (dedupedThreadName == null || !dedupedThreadName.equals(localThreadName)) {
- DEDUPED_THREAD_NAME.set(localThreadName);
- dedupedThreadName = localThreadName;
- }
-
- // Return the deduped name
- return dedupedThreadName;
- }
-
- /**
- * Message Visitor interface
- */
- public interface Visitor {
- void visit(long timestamp, String threadName, Class> klass, String message, Object[] args);
- }
-
- /**
- * Stores an individual "touch" with the timestamp and relevant data. We try to keep the memory footprint as small
- * as possible, so we just store references to the arguments instead of storing the fully formatted String as it
- * would be printed in the log. Since the vast majority of these will not be logged (typical use case is to only log
- * very slow requests), it is preferable to only do that final formatting if and when we actually need it.
- */
- /* package private */ static final class Message {
- final long _nanos;
- final String _threadName;
- final String _name;
- final Object[] _args;
- final Class> _klass;
- int _count;
- volatile Message _next;
-
- public Message(long nanos, String threadName, String name, Object[] args, Class> klass) {
- _nanos = nanos;
- _threadName = threadName;
- _name = name;
- _args = args;
- _klass = klass;
- }
- }
-
- private static final AtomicReferenceFieldUpdater NEXT_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(Message.class, Message.class, "_next");
-
- private static String getMaxMessagesWarning(int maxMessageCount) {
- return "TouchTimer Warning: Exceeded the maximum number of messages allowed (" + maxMessageCount
- + "). No further messages will be logged for this timer.";
- }
-
- static {
- DEFAULT_MAX_MESSAGES = Integer.parseUnsignedInt(System.getProperty(MAX_MESSAGES_PROPERTY, "2000"));
-
- MAX_MESSAGES_WARNING = getMaxMessagesWarning(DEFAULT_MAX_MESSAGES);
- }
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/misc/TriFunction.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/misc/TriFunction.java
deleted file mode 100644
index 0359c4c7cb4..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/misc/TriFunction.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package com.linkedin.alpini.base.misc;
-
-@FunctionalInterface
-public interface TriFunction {
- /**
- * Applies this function to the given arguments.
- *
- * @param t the first function argument
- * @param u the second function argument
- * @param s the third function argument
- * @return the function result
- */
- R apply(T t, U u, S s);
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/CallCompletion.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/CallCompletion.java
deleted file mode 100644
index 970131f267a..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/CallCompletion.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package com.linkedin.alpini.base.monitoring;
-
-import com.linkedin.alpini.base.misc.Time;
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-
-
-/**
- * @author Antony T Curtis {@literal }
- */
-public interface CallCompletion extends AutoCloseable {
- default void close() {
- close(Time.nanoTime());
- }
-
- default void closeWithError() {
- closeWithError(Time.nanoTime());
- }
-
- default void closeWithError(@Nonnull Throwable error) {
- closeWithError(Time.nanoTime(), error);
- }
-
- default void closeCompletion(T value, Throwable error) {
- closeCompletion(Time.nanoTime(), value, error);
- }
-
- void close(@Nonnegative long endTimeNanos);
-
- default void closeWithError(@Nonnegative long endTimeNanos) {
- closeWithError(endTimeNanos, CallTracker.GENERIC_EXCEPTION);
- }
-
- void closeWithError(@Nonnegative long endTimeNanos, @Nonnull Throwable error);
-
- default void closeCompletion(@Nonnegative long endTimeNanos, T value, Throwable error) {
- if (error == null) {
- close(endTimeNanos);
- } else {
- closeWithError(endTimeNanos, error);
- }
- }
-
- static CallCompletion combine(CallCompletion... completions) {
- CallCompletion[] array = completions.clone();
- return new CallCompletion() {
- @Override
- public void close(long endTimeNanos) {
- for (CallCompletion cc: array) {
- if (cc != null) {
- cc.close(endTimeNanos);
- }
- }
- }
-
- @Override
- public void closeWithError(long endTimeNanos, @Nonnull Throwable error) {
- for (CallCompletion cc: array) {
- if (cc != null) {
- cc.closeWithError(endTimeNanos, error);
- }
- }
- }
- };
- }
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/CallTracker.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/CallTracker.java
deleted file mode 100644
index f4c7ae37a06..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/CallTracker.java
+++ /dev/null
@@ -1,143 +0,0 @@
-package com.linkedin.alpini.base.monitoring;
-
-import com.linkedin.alpini.base.misc.ExceptionUtil;
-import com.linkedin.alpini.base.misc.Time;
-import com.linkedin.alpini.base.statistics.LongStats;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.CheckReturnValue;
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-
-
-/**
- * @author Antony T Curtis {@literal }
- */
-public interface CallTracker {
- Exception GENERIC_EXCEPTION = ExceptionUtil.withoutStackTrace(new Exception());
-
- @Nonnull
- @CheckReturnValue
- default CallCompletion startCall() {
- return startCall(Time.nanoTime());
- }
-
- @Nonnull
- @CheckReturnValue
- CallCompletion startCall(@Nonnegative long startTimeNanos);
-
- default void trackCall(long duration) {
- trackCall(duration, TimeUnit.MILLISECONDS);
- }
-
- default void trackCall(long duration, @Nonnull TimeUnit timeUnit) {
- trackCallWithError(duration, timeUnit, null);
- }
-
- default void trackCallWithError(long duration) {
- trackCallWithError(duration, TimeUnit.MILLISECONDS);
- }
-
- default void trackCallWithError(long duration, @Nonnull TimeUnit timeUnit) {
- trackCallWithError(duration, timeUnit, GENERIC_EXCEPTION);
- }
-
- default void trackCallWithError(long duration, Throwable throwable) {
- trackCallWithError(duration, TimeUnit.MILLISECONDS, throwable);
- }
-
- void trackCallWithError(long duration, @Nonnull TimeUnit timeUnit, Throwable throwable);
-
- long getCurrentStartCountTotal();
-
- long getCurrentCallCountTotal();
-
- long getCurrentErrorCountTotal();
-
- int getCurrentConcurrency();
-
- @Nonnull
- @CheckReturnValue
- double[] getAverageConcurrency();
-
- @Nonnull
- @CheckReturnValue
- int[] getMaxConcurrency();
-
- @Nonnull
- @CheckReturnValue
- int[] getStartFrequency();
-
- @Nonnull
- @CheckReturnValue
- long[] getStartCount();
-
- @Nonnull
- @CheckReturnValue
- int[] getErrorFrequency();
-
- @Nonnull
- @CheckReturnValue
- long[] getErrorCount();
-
- @Nonnull
- @CheckReturnValue
- CallStats getCallStats();
-
- void reset();
-
- long getLastResetTime();
-
- long getTimeSinceLastStartCall();
-
- interface CallStats {
- long getCallCountTotal();
-
- long getCallStartCountTotal();
-
- long getErrorCountTotal();
-
- int getConcurrency();
-
- double getAverageConcurrency1min();
-
- double getAverageConcurrency5min();
-
- double getAverageConcurrency15min();
-
- int getMaxConcurrency1min();
-
- int getMaxConcurrency5min();
-
- int getMaxConcurrency15min();
-
- int getStartFrequency1min();
-
- int getStartFrequency5min();
-
- int getStartFrequency15min();
-
- int getErrorFrequency1min();
-
- int getErrorFrequency5min();
-
- int getErrorFrequency15min();
-
- long getOutstandingStartTimeAvg();
-
- int getOutstandingCount();
-
- /**
- * Returns the call time stats in nanoseconds.
- * @return {@linkplain LongStats} object.
- */
- LongStats getCallTimeStats();
- }
-
- static CallTracker create() {
- return new CallTrackerImpl();
- }
-
- static CallTracker nullTracker() {
- return NullCallTracker.INSTANCE;
- }
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/CallTrackerImpl.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/CallTrackerImpl.java
deleted file mode 100644
index a03999d0a40..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/CallTrackerImpl.java
+++ /dev/null
@@ -1,844 +0,0 @@
-package com.linkedin.alpini.base.monitoring;
-
-import com.linkedin.alpini.base.concurrency.ConcurrentAccumulator;
-import com.linkedin.alpini.base.misc.Time;
-import com.linkedin.alpini.base.statistics.AbstractQuantileEstimation;
-import com.linkedin.alpini.base.statistics.LongStats;
-import com.linkedin.alpini.base.statistics.LongStatsAggregator;
-import com.linkedin.alpini.base.statistics.LongStatsArrayAggregator;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerArray;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.UnaryOperator;
-import java.util.stream.Collector;
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-
-
-/**
- * Tracker for "load average" of a call.
- *
- * @author Antony T Curtis {@literal }
- */
-public class CallTrackerImpl implements CallTracker {
- private static final int SECONDS_PER_BUCKET = 1;
- private static final long NANOS_PER_BUCKET = TimeUnit.SECONDS.toNanos(SECONDS_PER_BUCKET);
- private static final int NUMBER_OF_BUCKETS = 15 * 60 / SECONDS_PER_BUCKET + 1;
- private static final long SUNSET_NANOS = NANOS_PER_BUCKET * NUMBER_OF_BUCKETS;
- private static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
-
- private static final double LONG_STATS_EPSILON = 0.0005;
- private static final int LONG_STATS_SAMPLES = 2000;
-
- private static final Stats NULL_STATS = new Stats();
-
- private final ReentrantLock _lock = new ReentrantLock();
-
- private int _lastIndex;
- private long _nextNanos = Time.nanoTime();
- private final long[] _nanosBuckets = new long[NUMBER_OF_BUCKETS];
- private final long[] _startBuckets = new long[NUMBER_OF_BUCKETS];
- private final long[] _errorBuckets = new long[NUMBER_OF_BUCKETS];
- private final int[] _maxConBuckets = new int[NUMBER_OF_BUCKETS];
-
- public enum Mode {
- SKIP_LIST {
- @Override
- LongStatsAggregator constructor(double epsilon, int samples) {
- return new LongStatsAggregator(epsilon, samples);
- }
- },
- ARRAY_LIST {
- @Override
- LongStatsAggregator constructor(double epsilon, int samples) {
- return new LongStatsArrayAggregator(epsilon, samples);
- }
- };
-
- abstract LongStatsAggregator constructor(double epsilon, int samples);
- }
-
- public static Mode defaultMode = Mode.SKIP_LIST;
-
- private final LongStatsAggregator _callTimeStatsAggregator =
- defaultMode.constructor(LONG_STATS_EPSILON, LONG_STATS_SAMPLES);
- private final AtomicIntegerArray _concurrencyArray;
- private final AtomicIntegerArray _maxConcurrencyArray;
-
- private final ConcurrentAccumulator _stats;
- private long _lastResetTime = Time.currentTimeMillis();
- private long _lastStartTime = 0L;
-
- private Predicate _testSuccessful = ignored -> false;
-
- public CallTrackerImpl() {
- this(Runtime.getRuntime().availableProcessors());
- }
-
- public CallTrackerImpl(@Nonnegative int ncpu) {
- this(ncpu, ConcurrentAccumulator.defaultMode);
- }
-
- public CallTrackerImpl(@Nonnegative int ncpu, @Nonnull ConcurrentAccumulator.Mode accumulatorMode) {
- _concurrencyArray = new AtomicIntegerArray(cpuBuckets(Math.max(1, Math.min(ncpu, 64))));
- _maxConcurrencyArray = new AtomicIntegerArray(_concurrencyArray.length());
- _stats = new ConcurrentAccumulator<>(accumulatorMode, Stats.COLLECTOR);
- }
-
- private static final ThreadLocal LOCAL = ThreadLocal.withInitial(LocalState::new);
-
- private static final class LocalState {
- private final Reset reset = new Reset();
- private final Start start = new Start();
- private final EndTimeSuccess endSuccess = new EndTimeSuccess();
- private final EndTimeFailure endFailure = new EndTimeFailure();
- private State last;
- private State result;
- private State idle;
-
- private Start start(long startTimeNanos) {
- start._startTimeNanos = startTimeNanos;
- return start;
- }
-
- private EndTimeSuccess endSuccess(long now, long startTimeNanos) {
- endSuccess.now = now;
- endSuccess.startTimeNanos = startTimeNanos;
- return endSuccess;
- }
-
- private EndTimeFailure endFailure(long now, long startTimeNanos) {
- endFailure.now = now;
- endFailure.startTimeNanos = startTimeNanos;
- return endFailure;
- }
-
- private Stats combine(Stats stats, Stats other) {
- return combine(state(stats), state(other));
- }
-
- private Stats combine(State state, State other) {
- state._started += other._started;
- state._completed += other._completed;
- state._completedWithError += other._completedWithError;
- state._nanosRunning += other._nanosRunning;
- state._startTimeSum += other._startTimeSum;
- state._concurrency += other._concurrency;
- idle = other;
- return new Stats(state);
- }
-
- void localDone() {
- result = null;
- if (idle == null) {
- idle = last;
- last = null;
- }
- }
-
- public State state(Stats stats) {
- State oldState = stats.get();
- for (;;) {
- State newState = copyStats(oldState);
- if (stats.compareAndSet(oldState, newState)) {
- break;
- }
- idle = newState;
- oldState = stats.get();
- }
- State last = this.last;
- this.last = null;
- localDone();
- return last;
- }
-
- private State copyStats(State state) {
- return result(state).recycle(
- state._started,
- state._completed,
- state._completedWithError,
- state._nanosRunning,
- state._startTimeSum,
- state._concurrency);
- }
-
- private State result(State state) {
- last = state;
- if (result == null) {
- if (idle == null) {
- result = new State();
- } else {
- result = idle;
- idle = null;
- }
- }
- return result;
- }
-
- private final class Reset extends Change {
- @Override
- public State apply(State state) {
- long adjust = Math.min(state._started, state._completed);
- return result(state).recycle(
- state._started - adjust,
- state._completed - adjust,
- 0,
- state._nanosRunning,
- state._startTimeSum,
- state._concurrency);
- }
-
- public void done() {
- localDone();
- }
- }
-
- private final class Start extends Change {
- private long _startTimeNanos;
-
- @Override
- public State apply(State state) {
- return result(state).recycle(
- state._started + 1,
- state._completed,
- state._completedWithError,
- state._nanosRunning - _startTimeNanos,
- state._startTimeSum + _startTimeNanos,
- state._concurrency + 1);
- }
-
- public void done() {
- localDone();
- }
- }
-
- private final class EndTimeSuccess extends Change {
- private long now;
- private long startTimeNanos;
-
- @Override
- public State apply(State state) {
- return result(state).recycle(
- state._started,
- state._completed + 1,
- state._completedWithError,
- state._nanosRunning + now,
- state._startTimeSum - startTimeNanos,
- state._concurrency - 1);
- }
-
- public void done() {
- localDone();
- }
- }
-
- private final class EndTimeFailure extends Change {
- private long now;
- private long startTimeNanos;
-
- @Override
- public State apply(State state) {
- return result(state).recycle(
- state._started,
- state._completed + 1,
- state._completedWithError + 1,
- state._nanosRunning + now,
- state._startTimeSum - startTimeNanos,
- state._concurrency - 1);
- }
-
- public void done() {
- localDone();
- }
- }
- }
-
- private static abstract class Change implements UnaryOperator {
- public abstract State apply(State state);
-
- public abstract void done();
- }
-
- private static final class State {
- long _started;
- long _completed;
- long _completedWithError;
- long _nanosRunning;
- long _startTimeSum;
- long _concurrency;
-
- private State recycle(
- long started,
- long completed,
- long completedWithError,
- long nanosRunning,
- long startTimeSum,
- long concurrency) {
- _started = started;
- _completed = completed;
- _completedWithError = completedWithError;
- _nanosRunning = nanosRunning;
- _startTimeSum = startTimeSum;
- _concurrency = concurrency;
- return this;
- }
-
- private int concurrency() {
- return Math.toIntExact(_concurrency);
- }
- }
-
- private static final class Stats extends AtomicReference {
- Stats() {
- this(new State());
- }
-
- Stats(State state) {
- super(state);
- }
-
- private static final Collector COLLECTOR = Collector.of(
- Stats::new,
- Stats::accumulate,
- Stats::combine,
- Function.identity(),
- Collector.Characteristics.UNORDERED,
- Collector.Characteristics.IDENTITY_FINISH);
-
- private Stats combine(Stats other) {
- return LOCAL.get().combine(this, other);
- }
-
- private void accumulate(Change change) {
- super.updateAndGet(change);
- change.done();
- }
-
- private State state() {
- return LOCAL.get().state(this);
- }
- }
-
- private void tick(final long now) {
- assert _lock.isHeldByCurrentThread();
- if (_nextNanos + SUNSET_NANOS < now) {
- _nextNanos = now - SUNSET_NANOS;
- _lastIndex = 0;
- }
- long nextNanos = _nextNanos;
- if (nextNanos <= now) {
- Stats stats = stats();
- int maxConcurrency = 0;
- for (int i = _maxConcurrencyArray.length() - 1; i >= 0; i--) {
- maxConcurrency += _maxConcurrencyArray.getAndSet(i, 0);
- }
- int index = _lastIndex;
- do {
- if (NUMBER_OF_BUCKETS == ++index) {
- index = 0;
- }
- State state = stats.state();
- _nanosBuckets[index] = state._nanosRunning + state._concurrency * nextNanos;
- _startBuckets[index] = state._started;
- _errorBuckets[index] = state._completedWithError;
- _maxConBuckets[index] = maxConcurrency;
- nextNanos += NANOS_PER_BUCKET;
- } while (nextNanos <= now);
- _lastIndex = index;
- _nextNanos = nextNanos;
- }
- }
-
- public void setTestSuccessful(@Nonnull Predicate test) {
- _testSuccessful = test;
- }
-
- protected boolean isSuccessfulException(@Nonnull Throwable exception) {
- return _testSuccessful.test(exception);
- }
-
- private boolean isSuccessful(Throwable exception) {
- return exception == null || exception != GENERIC_EXCEPTION && isSuccessfulException(exception);
- }
-
- private final class Completion extends AtomicReference implements CallCompletion {
- private final long _startTimeNanos;
- private final long _threadId = Thread.currentThread().getId();
- private final int _slot = foldUp(_threadId) & (_concurrencyArray.length() - 1);
-
- private Completion(long startTimeNanos) {
- this._startTimeNanos = startTimeNanos;
- set(this);
- }
-
- @Override
- public void close(@Nonnegative long now) {
- long duration = Math.max(0L, now - _startTimeNanos);
- LocalState local = LOCAL.get();
- close(local, now, duration, local.endSuccess(now, _startTimeNanos));
- }
-
- @Override
- public void closeWithError(@Nonnegative long endTimeNanos) {
- closeWithError(endTimeNanos, GENERIC_EXCEPTION);
- }
-
- @Override
- public void closeWithError(@Nonnegative long now, @Nonnull Throwable error) {
- long duration = Math.max(0L, now - _startTimeNanos);
- LocalState local = LOCAL.get();
- close(
- local,
- now,
- duration,
- isSuccessful(error) ? local.endSuccess(now, _startTimeNanos) : local.endFailure(now, _startTimeNanos));
- }
-
- private void close(LocalState local, long now, long duration, Change change) {
- if (!compareAndSet(this, null)) {
- return;
- }
- _stats.accept(change);
- _callTimeStatsAggregator.accept(duration);
- _concurrencyArray.getAndDecrement(_slot);
- checkTick(now);
- }
- }
-
- @Override
- @Nonnull
- public CallCompletion startCall(@Nonnegative long startTimeNanos) {
- _lastStartTime = Math.max(_lastStartTime, startTimeNanos);
-
- Completion cc = new Completion(startTimeNanos);
-
- checkTick(startTimeNanos);
-
- LocalState local = LOCAL.get();
- _stats.accept(local.start(startTimeNanos));
-
- int con = _concurrencyArray.incrementAndGet(cc._slot);
- if (con > _maxConcurrencyArray.get(cc._slot)) {
- _maxConcurrencyArray.accumulateAndGet(cc._slot, con, Math::max);
- }
-
- return cc;
- }
-
- @Override
- public void trackCallWithError(long duration, @Nonnull TimeUnit timeUnit, Throwable throwable) {
- long now = Time.nanoTime();
- duration = timeUnit.toNanos(duration);
- long startTimeNanos = now - duration;
-
- _lastStartTime = Math.max(_lastStartTime, startTimeNanos);
- LocalState local = LOCAL.get();
- _stats.accept(local.start(startTimeNanos));
-
- _stats.accept(
- isSuccessful(throwable) ? local.endSuccess(now, startTimeNanos) : local.endFailure(now, startTimeNanos));
-
- _callTimeStatsAggregator.accept(duration);
-
- checkTick(now);
- }
-
- private void checkTick(long now) {
- if (_nextNanos <= now && _lock.tryLock()) {
- try {
- tick(now);
- } finally {
- _lock.unlock();
- }
- }
- }
-
- @Nonnull
- private Stats stats() {
- Stats stats = _stats.get();
- if (stats == null) {
- stats = NULL_STATS;
- }
- return stats;
- }
-
- private long getTotalRuntimeNanos(State state, long now) {
- return state._nanosRunning + state._concurrency * now;
- }
-
- public long getTotalRuntimeNanos() {
- return getTotalRuntimeNanos(stats().state(), Time.nanoTime());
- }
-
- @Override
- public long getCurrentStartCountTotal() {
- return stats().state()._started;
- }
-
- @Override
- public long getCurrentCallCountTotal() {
- return stats().state()._completed;
- }
-
- @Override
- public long getCurrentErrorCountTotal() {
- return stats().state()._completedWithError;
- }
-
- @Override
- public int getCurrentConcurrency() {
- return stats().state().concurrency();
- }
-
- private static int calcIndex(int pos, int buckets) {
- pos -= buckets - 1;
- if (pos < 0) {
- pos += NUMBER_OF_BUCKETS;
- }
- return pos;
- }
-
- /**
- * Returns the average concurrency over a period of time.
- * @return array of 1, 5 and 15 minute average concurrency.
- */
- @Override
- @Nonnull
- public double[] getAverageConcurrency() {
- long now = Time.nanoTime();
- _lock.lock();
- try {
- tick(now);
- return getAverageConcurrency(now, stats().state());
- } finally {
- _lock.unlock();
- }
- }
-
- private double[] getAverageConcurrency(long now, State state) {
- assert _lock.isHeldByCurrentThread();
- long delta1;
- long delta5;
- long delta15;
-
- long currentTotal = getTotalRuntimeNanos(state, now);
-
- int lastIndex = _lastIndex;
-
- int index1 = calcIndex(lastIndex, 60 / SECONDS_PER_BUCKET);
- int index5 = calcIndex(lastIndex, 300 / SECONDS_PER_BUCKET);
- int index15 = calcIndex(lastIndex, 900 / SECONDS_PER_BUCKET);
-
- delta1 = currentTotal - _nanosBuckets[index1];
- delta5 = currentTotal - _nanosBuckets[index5];
- delta15 = currentTotal - _nanosBuckets[index15];
-
- return new double[] { ((double) delta1) / (60 * NANOS_PER_SECOND), ((double) delta5) / (300 * NANOS_PER_SECOND),
- ((double) delta15) / (900 * NANOS_PER_SECOND) };
- }
-
- @Override
- @Nonnull
- public int[] getMaxConcurrency() {
- checkTick(Time.nanoTime());
- return getMaxConcurrency(stats().state());
- }
-
- private int[] getMaxConcurrency(State state) {
- int lastIndex = _lastIndex;
-
- int index1 = calcIndex(lastIndex, 60 / SECONDS_PER_BUCKET);
- int index5 = calcIndex(lastIndex, 300 / SECONDS_PER_BUCKET);
- int index15 = calcIndex(lastIndex, 900 / SECONDS_PER_BUCKET);
-
- int index = lastIndex;
-
- int concurrency1 = 0;
- for (; index != index1; index = (index == 0 ? NUMBER_OF_BUCKETS : index) - 1) {
- concurrency1 = Math.max(concurrency1, _maxConBuckets[index]);
- }
-
- int concurrency5 = concurrency1;
- for (; index != index5; index = (index == 0 ? NUMBER_OF_BUCKETS : index) - 1) {
- concurrency5 = Math.max(concurrency5, _maxConBuckets[index]);
- }
-
- int concurrency15 = concurrency5;
- for (; index != index15; index = (index == 0 ? NUMBER_OF_BUCKETS : index) - 1) {
- concurrency15 = Math.max(concurrency15, _maxConBuckets[index]);
- }
-
- return new int[] { concurrency1, concurrency5, concurrency15 };
- }
-
- private static int div(long numerator, int denominator) {
- return (int) ((numerator + denominator - 1) / denominator);
- }
-
- @Override
- @Nonnull
- public int[] getStartFrequency() {
- checkTick(Time.nanoTime());
- return getStartFrequency(stats().state());
- }
-
- @Override
- @Nonnull
- public long[] getStartCount() {
- checkTick(Time.nanoTime());
- return getStartCount(stats().state());
- }
-
- private int[] getStartFrequency(State state) {
- return getFrequency(state._started, _startBuckets);
- }
-
- private long[] getStartCount(State state) {
- return getDiffCount(state._started, _startBuckets);
- }
-
- @Override
- @Nonnull
- public int[] getErrorFrequency() {
- checkTick(Time.nanoTime());
- return getErrorFrequency(stats().state());
- }
-
- public long[] getErrorCount() {
- checkTick(Time.nanoTime());
- return getErrorCount(stats().state());
- }
-
- private int[] getErrorFrequency(State state) {
- return getFrequency(state._completedWithError, _errorBuckets);
- }
-
- private long[] getErrorCount(State state) {
- return getDiffCount(state._completedWithError, _errorBuckets);
- }
-
- private int[] getFrequency(long currentTotal, long[] buckets) {
- return getFrequency0(currentTotal, buckets);
- }
-
- private long[] getDiffCount(long currentTotal, long[] buckets) {
- int lastIndex = _lastIndex;
- int index1 = calcIndex(lastIndex, 60 / SECONDS_PER_BUCKET);
- int index5 = calcIndex(lastIndex, 300 / SECONDS_PER_BUCKET);
- int index15 = calcIndex(lastIndex, 900 / SECONDS_PER_BUCKET);
- long value1 = currentTotal - buckets[index1];
- long value5 = currentTotal - buckets[index5];
- long value15 = currentTotal - buckets[index15];
- return new long[] { value1, value5, value15 };
- }
-
- private int[] getFrequency0(long currentTotal, long[] buckets) {
- long[] diffCounts = getDiffCount(currentTotal, buckets);
- int value1 = div(diffCounts[0], 60);
- int value5 = div(diffCounts[1], 300);
- int value15 = div(diffCounts[2], 900);
- return new int[] { value1, value5, value15 };
- }
-
- @Override
- @Nonnull
- public CallStats getCallStats() {
- long now = Time.nanoTime();
- _lock.lock();
- try {
- tick(now);
- return new CallStatsImpl(now, this, stats().state(), _callTimeStatsAggregator.getLongStats());
- } finally {
- _lock.unlock();
- }
- }
-
- public AbstractQuantileEstimation.Quantile computeQuantile(long sample, @Nonnull TimeUnit timeUnit) {
- return _callTimeStatsAggregator.computeQuantile(timeUnit.toNanos(sample));
- }
-
- @Override
- public void reset() {
- _lastResetTime = Time.currentTimeMillis();
- LocalState local = LOCAL.get();
- _stats.accept(local.reset);
- _callTimeStatsAggregator.reset();
- }
-
- @Override
- public long getLastResetTime() {
- return _lastResetTime;
- }
-
- @Override
- public long getTimeSinceLastStartCall() {
- return _lastStartTime == 0L ? 0L : TimeUnit.NANOSECONDS.toMillis(Time.nanoTime() - _lastStartTime);
- }
-
- private static int cpuBuckets(int proc) {
- proc--;
- proc |= proc >>> 1;
- proc |= proc >>> 2;
- proc |= proc >>> 4;
- proc |= proc >>> 8;
- proc |= proc >>> 16;
- return Math.max(1, ++proc);
- }
-
- private static int foldUp(long value) {
- return (int) (0xffL & (value ^ (value >>> 8) ^ (value >>> 16) ^ (value >>> 24) ^ (value >>> 32) ^ (value >>> 40)
- ^ (value >>> 48) ^ (value >>> 52)));
- }
-
- private static final class CallStatsImpl implements CallStats {
- private final long _callCountTotal;
- private final long _callStartCountTotal;
- private final long _errorCountTotal;
- private final int _currentConcurrency;
- private final double[] _concurrencyAvg;
- private final int[] _maxConcurrency;
- private final int[] _errorFrequency;
- private final int[] _startFrequency;
- private final long _outstandingStartTimes;
- private final LongStats _callTimeStats;
-
- private CallStatsImpl(long now, CallTrackerImpl callTracker, State state, LongStats callTimeStats) {
- _callCountTotal = state._completed;
- _callStartCountTotal = state._started;
- _errorCountTotal = state._completedWithError;
- _currentConcurrency = state.concurrency();
- _concurrencyAvg = callTracker.getAverageConcurrency(now, state);
- _maxConcurrency = callTracker.getMaxConcurrency(state);
- _errorFrequency = callTracker.getErrorFrequency(state);
- _startFrequency = callTracker.getStartFrequency(state);
- _outstandingStartTimes = state._startTimeSum;
- _callTimeStats = callTimeStats;
- }
-
- @Override
- public long getCallCountTotal() {
- return _callCountTotal;
- }
-
- @Override
- public long getCallStartCountTotal() {
- return _callStartCountTotal;
- }
-
- @Override
- public long getErrorCountTotal() {
- return _errorCountTotal;
- }
-
- @Override
- public int getConcurrency() {
- return _currentConcurrency;
- }
-
- @Override
- public double getAverageConcurrency1min() {
- return _concurrencyAvg[0];
- }
-
- @Override
- public double getAverageConcurrency5min() {
- return _concurrencyAvg[1];
- }
-
- @Override
- public double getAverageConcurrency15min() {
- return _concurrencyAvg[2];
- }
-
- @Override
- public long getOutstandingStartTimeAvg() {
- int outstanding = getConcurrency();
- return outstanding > 0 ? _outstandingStartTimes / outstanding : 0;
- }
-
- @Override
- public int getOutstandingCount() {
- return getConcurrency();
- }
-
- @Override
- public int getMaxConcurrency1min() {
- return _maxConcurrency[0];
- }
-
- @Override
- public int getMaxConcurrency5min() {
- return _maxConcurrency[1];
- }
-
- @Override
- public int getMaxConcurrency15min() {
- return _maxConcurrency[2];
- }
-
- @Override
- public int getErrorFrequency1min() {
- return _errorFrequency[0];
- }
-
- @Override
- public int getErrorFrequency5min() {
- return _errorFrequency[1];
- }
-
- @Override
- public int getErrorFrequency15min() {
- return _errorFrequency[2];
- }
-
- @Override
- public int getStartFrequency1min() {
- return _startFrequency[0];
- }
-
- @Override
- public int getStartFrequency5min() {
- return _startFrequency[1];
- }
-
- @Override
- public int getStartFrequency15min() {
- return _startFrequency[2];
- }
-
- @Override
- public LongStats getCallTimeStats() {
- return _callTimeStats;
- }
-
- @Override
- public String toString() {
- return String.format(
- "callCountTotal=%d startCountTotal=%d errorCountTotal=%d"
- + " concurrency=%d concurrencyAvg=%.3f,%.3f,%.3f concurrencyMax=%d,%d,%d"
- + " startFrequency=%d,%d,%d errorFrequency=%d,%d,%d" + " outstanding=%d outstandingStartTimeAvg=%d %s",
- getCallCountTotal(),
- getCallStartCountTotal(),
- getErrorCountTotal(),
- getConcurrency(),
- getAverageConcurrency1min(),
- getAverageConcurrency5min(),
- getAverageConcurrency15min(),
- getMaxConcurrency1min(),
- getMaxConcurrency5min(),
- getMaxConcurrency15min(),
- getStartFrequency1min(),
- getStartFrequency5min(),
- getStartFrequency15min(),
- getErrorFrequency1min(),
- getErrorFrequency5min(),
- getErrorFrequency15min(),
- getOutstandingCount(),
- getOutstandingStartTimeAvg(),
- _callTimeStats);
- }
- }
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/NullCallTracker.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/NullCallTracker.java
deleted file mode 100644
index cc1f9974a71..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/NullCallTracker.java
+++ /dev/null
@@ -1,263 +0,0 @@
-package com.linkedin.alpini.base.monitoring;
-
-import com.linkedin.alpini.base.statistics.LongStats;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-
-
-/**
- * Created by acurtis on 3/30/17.
- */
-public final class NullCallTracker implements CallTracker {
- public static final CallTracker INSTANCE = new NullCallTracker();
-
- @Nonnull
- @Override
- public CallCompletion startCall(@Nonnegative long startTimeNanos) {
- return CALL_COMPLETION;
- }
-
- @Override
- public void trackCall(long duration, @Nonnull TimeUnit timeUnit) {
- }
-
- @Override
- public void trackCallWithError(long duration, @Nonnull TimeUnit timeUnit, Throwable throwable) {
- }
-
- @Override
- public long getCurrentStartCountTotal() {
- return 0;
- }
-
- @Override
- public long getCurrentCallCountTotal() {
- return 0;
- }
-
- @Override
- public long getCurrentErrorCountTotal() {
- return 0;
- }
-
- @Override
- public int getCurrentConcurrency() {
- return 0;
- }
-
- @Nonnull
- @Override
- public double[] getAverageConcurrency() {
- return new double[0];
- }
-
- @Nonnull
- @Override
- public int[] getMaxConcurrency() {
- return new int[0];
- }
-
- @Nonnull
- @Override
- public int[] getStartFrequency() {
- return new int[0];
- }
-
- @Nonnull
- @Override
- public long[] getStartCount() {
- return new long[0];
- }
-
- @Nonnull
- @Override
- public int[] getErrorFrequency() {
- return new int[0];
- }
-
- @Nonnull
- @Override
- public long[] getErrorCount() {
- return new long[0];
- }
-
- @Nonnull
- @Override
- public CallStats getCallStats() {
- return CALL_STATS;
- }
-
- @Override
- public void reset() {
- }
-
- @Override
- public long getLastResetTime() {
- return 0;
- }
-
- @Override
- public long getTimeSinceLastStartCall() {
- return 0;
- }
-
- private static final CallCompletion CALL_COMPLETION = new CallCompletion() {
- @Override
- public void close(@Nonnegative long endTimeNanos) {
- }
-
- @Override
- public void closeWithError(@Nonnegative long endTimeNanos, @Nonnull Throwable error) {
- }
- };
-
- private static final CallStats CALL_STATS = new CallStats() {
- @Override
- public long getCallCountTotal() {
- return 0;
- }
-
- @Override
- public long getCallStartCountTotal() {
- return 0;
- }
-
- @Override
- public long getErrorCountTotal() {
- return 0;
- }
-
- @Override
- public int getConcurrency() {
- return 0;
- }
-
- @Override
- public double getAverageConcurrency1min() {
- return 0;
- }
-
- @Override
- public double getAverageConcurrency5min() {
- return 0;
- }
-
- @Override
- public double getAverageConcurrency15min() {
- return 0;
- }
-
- @Override
- public int getMaxConcurrency1min() {
- return 0;
- }
-
- @Override
- public int getMaxConcurrency5min() {
- return 0;
- }
-
- @Override
- public int getMaxConcurrency15min() {
- return 0;
- }
-
- @Override
- public int getStartFrequency1min() {
- return 0;
- }
-
- @Override
- public int getStartFrequency5min() {
- return 0;
- }
-
- @Override
- public int getStartFrequency15min() {
- return 0;
- }
-
- @Override
- public int getErrorFrequency1min() {
- return 0;
- }
-
- @Override
- public int getErrorFrequency5min() {
- return 0;
- }
-
- @Override
- public int getErrorFrequency15min() {
- return 0;
- }
-
- @Override
- public long getOutstandingStartTimeAvg() {
- return 0;
- }
-
- @Override
- public int getOutstandingCount() {
- return 0;
- }
-
- @Override
- public LongStats getCallTimeStats() {
- return LONG_STATS;
- }
- };
-
- private static final LongStats LONG_STATS = new LongStats() {
- @Override
- public long getLongCount() {
- return 0;
- }
-
- @Override
- public double getAverage() {
- return 0;
- }
-
- @Override
- public double getStandardDeviation() {
- return 0;
- }
-
- @Override
- public Long getMinimum() {
- return null;
- }
-
- @Override
- public Long getMaximum() {
- return null;
- }
-
- @Override
- public Long get50Pct() {
- return null;
- }
-
- @Override
- public Long get90Pct() {
- return null;
- }
-
- @Override
- public Long get95Pct() {
- return null;
- }
-
- @Override
- public Long get99Pct() {
- return null;
- }
-
- @Override
- public Long get99_9Pct() {
- return null;
- }
- };
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/package-info.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/package-info.java
deleted file mode 100644
index c6db4c83d25..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/monitoring/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * @author Antony T Curtis {@literal }
- */
-package com.linkedin.alpini.base.monitoring;
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/AsyncPool.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/AsyncPool.java
deleted file mode 100644
index f42156c710f..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/AsyncPool.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package com.linkedin.alpini.base.pool;
-
-import com.linkedin.alpini.base.pool.impl.AsyncPoolImpl;
-import com.linkedin.alpini.base.pool.impl.RateLimitedCreateLifeCycle;
-import com.linkedin.alpini.base.registry.Shutdownable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-
-
-/**
- * @author Antony T Curtis {@literal }
- */
-public interface AsyncPool {
- void start();
-
- int size();
-
- CompletableFuture acquire();
-
- void release(T entity);
-
- void dispose(T entity);
-
- default CompletableFuture shutdownPool() {
- if (this instanceof Shutdownable) {
- ((Shutdownable) this).shutdown();
- return CompletableFuture.supplyAsync(() -> {
- try {
- ((Shutdownable) this).waitForShutdown();
- return null;
- } catch (InterruptedException e) {
- throw new CompletionException(e);
- }
- });
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }
-
- interface LifeCycle {
- CompletableFuture create();
-
- CompletableFuture testOnRelease(T entity);
-
- CompletableFuture testAfterIdle(T entity);
-
- CompletableFuture destroy(T entity);
-
- default CompletableFuture shutdown() {
- return CompletableFuture.completedFuture(null);
- }
-
- default W unwrap(Class iface) {
- if (isWrapperFor(iface)) {
- return iface.cast(this);
- } else {
- throw new IllegalArgumentException();
- }
- }
-
- default boolean isWrapperFor(Class> iface) {
- return iface.isAssignableFrom(getClass());
- }
- }
-
- PoolStats getPoolStats();
-
- static AsyncPool create(
- @Nonnull LifeCycle lifeCycle,
- @Nonnull Executor executor,
- int maxConcurrentCreate,
- int maxWaiters,
- int minimumEntities,
- int maximumEntities,
- long maxIdleTime,
- @Nonnull TimeUnit maxIdleUnit) {
- return new AsyncPoolImpl<>(
- lifeCycle,
- executor,
- maxConcurrentCreate,
- maxWaiters,
- minimumEntities,
- maximumEntities,
- maxIdleTime,
- maxIdleUnit);
- }
-
- static LifeCycle rateLimitCreate(
- @Nonnull AsyncPool.LifeCycle lifeCycle,
- @Nonnull ScheduledExecutorService executor,
- long minimumTimeDelay,
- long maximumTimeDelay,
- long timeIncrement,
- @Nonnull TimeUnit unit) {
- return new RateLimitedCreateLifeCycle<>(
- lifeCycle,
- executor,
- minimumTimeDelay,
- maximumTimeDelay,
- timeIncrement,
- unit);
- }
-
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/AsyncQOSPool.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/AsyncQOSPool.java
deleted file mode 100644
index 02404788e3e..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/AsyncQOSPool.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.linkedin.alpini.base.pool;
-
-import com.linkedin.alpini.base.pool.impl.AsyncQOSPoolImpl;
-import com.linkedin.alpini.base.queuing.QOSPolicy;
-import com.linkedin.alpini.consts.QOS;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * @author Antony T Curtis {@literal }
- */
-public interface AsyncQOSPool extends AsyncPool {
- CompletableFuture acquire(String queueName, QOS qos);
-
- static AsyncQOSPool create(
- LifeCycle lifeCycle,
- QOSPolicy.StaticConfig qosPolicyConfig,
- Executor executor,
- int maxConcurrentCreate,
- int minimumEntities,
- int maximumEntities,
- long maxIdleTime,
- TimeUnit maxIdleUnit) {
- return new AsyncQOSPoolImpl<>(
- lifeCycle,
- qosPolicyConfig,
- executor,
- maxConcurrentCreate,
- minimumEntities,
- maximumEntities,
- maxIdleTime,
- maxIdleUnit);
- }
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/PoolStats.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/PoolStats.java
deleted file mode 100644
index d2c577220aa..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/PoolStats.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package com.linkedin.alpini.base.pool;
-
-/**
- * @author Antony T Curtis {@literal }
- */
-public interface PoolStats {
- /**
- * Get the total number of pool objects created between
- * the starting of the Pool and the call to getStats().
- * Does not include create errors.
- * @return The total number of pool objects created
- */
- long getTotalCreated();
-
- /**
- * Get the total number of pool objects destroyed between
- * the starting of the Pool and the call to getStats().
- * Includes lifecycle validation failures, disposes,
- * and timed-out objects, but does not include destroy errors.
- * @return The total number of pool objects destroyed
- */
- long getTotalDestroyed();
-
- /**
- * Get the total number of lifecycle create errors between
- * the starting of the Pool and the call to getStats().
- * @return The total number of create errors
- */
- long getTotalCreateErrors();
-
- /**
- * Get the total number of lifecycle destroy errors between
- * the starting of the Pool and the call to getStats().
- * @return The total number of destroy errors
- */
- long getTotalDestroyErrors();
-
- /**
- * Get the number of pool objects checked out at the time of
- * the call to getStats().
- * @return The number of checked out pool objects
- */
- double getCheckedOut1min();
-
- double getCheckedOut5min();
-
- double getCheckedOut15min();
-
- int getMaxCheckedOut1min();
-
- double getCheckedOutTimeAvg();
-
- double getCheckedOutTime50Pct();
-
- double getCheckedOutTime95Pct();
-
- double getCheckedOutTime99Pct();
-
- /**
- * Get the configured maximum pool size.
- * @return The maximum pool size
- */
- int getMaxPoolSize();
-
- /**
- * Get the configured minimum pool size.
- * @return The minimum pool size
- */
- int getMinPoolSize();
-
- /**
- * Get the pool size at the time of the call to getStats().
- * @return The pool size
- */
- int getPoolSize();
-
- /**
- * Get the number of objects that are idle(not checked out)
- * in the pool.
- * @return The number of idle objects
- */
- int getIdleCount();
-
- double getWaiters1min();
-
- double getWaiters5min();
-
- double getWaiters15min();
-
- /**
- * Get the average wait time to get a pooled object.
- * @return The average wait time.
- */
- double getWaitTimeAvg();
-
- /**
- * Get the 50 percentage wait time to get a pooled object.
- * @return 50 percentage wait time.
- */
- double getWaitTime50Pct();
-
- /**
- * Get the 95 percentage wait time to get a pooled object.
- * @return 95 percentage wait time.
- */
- double getWaitTime95Pct();
-
- /**
- * Get the 99 percentage wait time to get a pooled object.
- * @return 99 percentage wait time.
- */
- double getWaitTime99Pct();
-
- /**
- * Get stats collected from {@link AsyncPool.LifeCycle}
- * @return Lifecycle stats
- */
- LifeCycleStats getLifecycleStats();
-
- interface LifeCycleStats {
- /**
- * Get the average time to create an object.
- * @return The average create time.
- */
- double getCreateTimeAvg();
-
- /**
- * Get the 50 percentage time to create an object.
- * @return 50 percentage create time.
- */
- double getCreateTime50Pct();
-
- /**
- * Get the 95 percentage time to create an object.
- * @return 95 percentage create time.
- */
- double getCreateTime95Pct();
-
- /**
- * Get the 99 percentage time to create an object.
- * @return 99 percentage create time.
- */
- double getCreateTime99Pct();
-
- /**
- * Get the average time to create an object.
- * @return The average create time.
- */
- double getTestTimeAvg();
-
- /**
- * Get the 50 percentage time to create an object.
- * @return 50 percentage create time.
- */
- double getTestTime50Pct();
-
- /**
- * Get the 95 percentage time to create an object.
- * @return 95 percentage create time.
- */
- double getTestTime95Pct();
-
- /**
- * Get the 99 percentage time to create an object.
- * @return 99 percentage create time.
- */
- double getTestTime99Pct();
-
- /**
- * Get the average time to create an object.
- * @return The average create time.
- */
- double getDestroyTimeAvg();
-
- /**
- * Get the 50 percentage time to create an object.
- * @return 50 percentage create time.
- */
- double getDestroyTime50Pct();
-
- /**
- * Get the 95 percentage time to create an object.
- * @return 95 percentage create time.
- */
- double getDestroyTime95Pct();
-
- /**
- * Get the 99 percentage time to create an object.
- * @return 99 percentage create time.
- */
- double getDestroyTime99Pct();
-
- }
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/AsyncPoolImpl.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/AsyncPoolImpl.java
deleted file mode 100644
index ee280407ab6..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/AsyncPoolImpl.java
+++ /dev/null
@@ -1,656 +0,0 @@
-package com.linkedin.alpini.base.pool.impl;
-
-import com.linkedin.alpini.base.misc.Time;
-import com.linkedin.alpini.base.monitoring.CallCompletion;
-import com.linkedin.alpini.base.monitoring.CallTracker;
-import com.linkedin.alpini.base.pool.AsyncPool;
-import com.linkedin.alpini.base.pool.PoolStats;
-import com.linkedin.alpini.base.registry.ShutdownableResource;
-import com.linkedin.alpini.base.statistics.LongStats;
-import java.util.IdentityHashMap;
-import java.util.Objects;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import javax.annotation.Nonnull;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-
-/**
- * @author Antony T Curtis {@literal }
- */
-public class AsyncPoolImpl implements AsyncPool, ShutdownableResource {
- private static final Logger LOG = LogManager.getLogger(AsyncPoolImpl.class);
-
- private final LifeCycle _lifeCycle;
- private final ConcurrentLinkedQueue _idleQueue;
- private final ConcurrentLinkedQueue> _waiters;
- private final IdentityHashMap _checkedOut;
- private final AtomicInteger _totalEntities;
- private final AtomicInteger _totalWaiters;
- private final CallTracker _waitersCallTracker;
- private final CallTracker _checkedOutCallTracker;
- private final int _minimumEntities;
- private final int _maximumEntities;
- private final long _maxIdleTime;
- private final int _maxWaiters;
- private final Executor _executor;
- private final Semaphore _concurrentCreate;
- private CompletableFuture _shutdown;
- private CompletableFuture _shutdownCompleted;
- private ShutdownException _shutdownException;
-
- public AsyncPoolImpl(
- LifeCycle lifeCycle,
- Executor executor,
- int maxConcurrentCreate,
- int maxWaiters,
- int minimumEntities,
- int maximumEntities,
- long maxIdleTime,
- TimeUnit maxIdleUnit) {
- _lifeCycle = Objects.requireNonNull(lifeCycle, "lifeCycle");
- _executor = Objects.requireNonNull(executor, "executor");
- checkLowerBound(maxConcurrentCreate, 1, "maxConcurrentCreate");
- _maxWaiters = checkLowerBound(maxWaiters, 1, "maxWaiters");
- _maxIdleTime = Objects.requireNonNull(maxIdleUnit, "maxIdleUnit").toNanos(maxIdleTime);
- if (_maxIdleTime < 1) {
- throw new IllegalArgumentException("maxIdleTime");
- }
- _minimumEntities = checkLowerBound(minimumEntities, 0, "minimumEntries");
- _maximumEntities = checkLowerBound(maximumEntities, minimumEntities, "maximumEntries");
- _idleQueue = new ConcurrentLinkedQueue<>();
- _waiters = new ConcurrentLinkedQueue<>();
- _checkedOut = new IdentityHashMap<>();
- _totalEntities = new AtomicInteger();
- _totalWaiters = new AtomicInteger();
- _waitersCallTracker = createCallTracker();
- _checkedOutCallTracker = createCallTracker();
- _concurrentCreate = new Semaphore(maxConcurrentCreate);
- }
-
- protected CallTracker createCallTracker() {
- return CallTracker.create();
- }
-
- private static int checkLowerBound(int value, int bound, String message) {
- if (value < bound) {
- throw new IllegalArgumentException(message);
- }
- return value;
- }
-
- @Override
- public void start() {
- if (_shutdown == null && _totalEntities.get() < _minimumEntities && _concurrentCreate.tryAcquire()) {
- _totalEntities.incrementAndGet();
- create();
- }
- }
-
- protected final CallCompletion startWaiters() {
- return _waitersCallTracker.startCall();
- }
-
- protected CompletableFuture checkout(@Nonnull CompletableFuture future, @Nonnull CallCompletion waiter) {
- return future.whenComplete((entity, ex) -> {
- if (entity != null) {
- CallCompletion checkout = _checkedOutCallTracker.startCall();
- CallCompletion cc;
- synchronized (_checkedOut) {
- cc = _checkedOut.put(entity, checkout);
- }
- if (cc != null) {
- LOG.warn("Entry already checked out {}", entity);
- cc.closeWithError();
- }
- }
- waiter.closeCompletion(entity, ex);
- });
- }
-
- @Override
- public CompletableFuture acquire() {
- return checkout(acquire0(), startWaiters());
- }
-
- protected final CompletableFuture acquire0() {
- long now = Time.currentTimeMillis();
- IdleEntity idle;
- while ((idle = _idleQueue.poll()) != null) {
- if (idle._idleNanos + _maxIdleTime < now) {
- T entity = idle.get();
- CompletableFuture.supplyAsync(idle, _executor)
- .thenCompose(_lifeCycle::testAfterIdle)
- .thenAccept(success -> afterTest(success, entity));
- } else {
- return CompletableFuture.completedFuture(idle.get());
- }
- }
- CompletableFuture future = new CompletableFuture<>();
- if (_shutdown != null) {
- synchronized (this) {
- future.obtrudeException(_shutdownException);
- return future;
- }
- }
- if (_waiters.add(future)) {
- int totalWaiters = _totalWaiters.incrementAndGet();
-
- outer: while (!future.isDone() && (idle = _idleQueue.poll()) != null) { // SUPPRESS CHECKSTYLE InnerAssignment
- if (idle._idleNanos + _maxIdleTime < now) {
- T entity = idle.get();
- CompletableFuture.supplyAsync(idle, _executor)
- .thenCompose(_lifeCycle::testAfterIdle)
- .thenAccept(success -> afterTest(success, entity));
- } else {
- CompletableFuture waiter;
- while ((waiter = _waiters.poll()) != future) {
- _totalWaiters.getAndDecrement();
- if (waiter.complete(idle.get())) {
- continue outer;
- }
- }
- future.obtrudeValue(idle.get());
- return future;
- }
- }
- if (_idleQueue.isEmpty() && _totalEntities.get() < _maximumEntities && _concurrentCreate.tryAcquire()) {
- _totalEntities.incrementAndGet();
- create();
- }
-
- if (totalWaiters > _maxWaiters) {
- CompletableFuture waiter;
- int rejected = 0;
- TooManyWaitersException rejectedException = new TooManyWaitersException();
- while (totalWaiters + rejected > _maximumEntities && (waiter = _waiters.poll()) != null) { // SUPPRESS
- // CHECKSTYLE
- // InnerAssignment
- rejected--;
- waiter.completeExceptionally(rejectedException);
- }
- _totalWaiters.addAndGet(rejected);
- }
- } else {
- future.obtrudeException(new IllegalStateException());
- }
- return future;
- }
-
- private void afterTest(boolean success, T entity) {
- if (success && _shutdown == null) {
- CompletableFuture future;
- while ((future = _waiters.poll()) != null) {
- _totalWaiters.getAndDecrement();
- if (future.complete(entity)) {
- return;
- }
- }
- _idleQueue.add(new IdleEntity(entity));
- } else {
- dispose0(entity);
- }
- }
-
- @Override
- public void release(T entity) {
- synchronized (_checkedOut) {
- CallCompletion cc = _checkedOut.remove(entity);
- if (cc == null) {
- throw new IllegalStateException("Cannot release object which was not checked out");
- }
- cc.close();
- }
- release0(entity);
- }
-
- protected final void release0(T entity) {
- _lifeCycle.testOnRelease(Objects.requireNonNull(entity)).exceptionally(ex -> {
- LOG.warn("error in lifecycle testOnRelease", ex);
- return false;
- }).thenAccept(success -> afterTest(success, entity));
- }
-
- @Override
- public void dispose(T entity) {
- synchronized (_checkedOut) {
- CallCompletion cc = _checkedOut.remove(entity);
- if (cc == null) {
- throw new IllegalStateException("Cannot dispose object which was not checked out");
- }
- cc.closeWithError();
- }
- release0(entity);
- }
-
- protected final void dispose0(T entity) {
- _lifeCycle.destroy(Objects.requireNonNull(entity)).exceptionally(ex -> {
- LOG.warn("error in lifecycle destroy", ex);
- return null;
- }).thenRun(() -> {
- if (_shutdown == null && _minimumEntities > _totalEntities.get() - 1 && _concurrentCreate.tryAcquire()) {
- create();
- return;
- }
- int total = _totalEntities.decrementAndGet();
- if (_shutdown != null && total == 0) {
- _shutdown.complete(Boolean.TRUE);
- }
- });
- }
-
- private void create() {
- if (_shutdown != null) {
- _concurrentCreate.release();
- if (_totalEntities.decrementAndGet() == 0) {
- _shutdown.complete(Boolean.TRUE);
- }
- }
- _lifeCycle.create().whenComplete((newEntity, ex) -> {
- _concurrentCreate.release();
- if (ex != null) {
- LOG.warn("error in lifecycle create", ex);
- }
- }).thenAccept(newEntity -> {
- if (newEntity != null) {
- afterTest(true, newEntity);
- } else {
- int total = _totalEntities.decrementAndGet();
- if (_shutdown != null && total == 0) {
- _shutdown.complete(Boolean.TRUE);
- }
- }
- }).thenRun(this::start);
- }
-
- @Override
- public int size() {
- return _totalEntities.get();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isShutdown() {
- return _shutdown != null;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isTerminated() {
- return isShutdown() && _shutdown.isDone();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public synchronized void shutdown() {
- if (_shutdown == null) {
- _shutdownException = new ShutdownException();
- _shutdown = new CompletableFuture<>();
- _shutdownCompleted = _shutdown.thenCompose(ignore -> _lifeCycle.shutdown());
-
- CompletableFuture future;
- while ((future = _waiters.poll()) != null) {
- _totalWaiters.getAndDecrement();
- future.completeExceptionally(_shutdownException);
- }
-
- IdleEntity idle;
- while ((idle = _idleQueue.poll()) != null) {
- dispose0(idle.get());
- }
-
- if (_totalEntities.get() == 0) {
- _shutdown.complete(Boolean.TRUE);
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public CompletableFuture shutdownPool() {
- shutdown();
- return _shutdownCompleted.thenApply(Function.identity());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void waitForShutdown() throws InterruptedException, IllegalStateException {
- CompletableFuture shutdown = _shutdownCompleted;
- if (shutdown == null) {
- throw new IllegalStateException();
- } else {
- try {
- shutdown.get();
- } catch (ExecutionException e) {
- throw new InterruptedException(e.getCause().getMessage());
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void waitForShutdown(long timeoutInMs) throws InterruptedException, IllegalStateException, TimeoutException {
- CompletableFuture shutdown = _shutdownCompleted;
- if (shutdown == null) {
- throw new IllegalStateException();
- } else {
- try {
- shutdown.get(timeoutInMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw new InterruptedException(e.getCause().getMessage());
- }
- }
- }
-
- @Override
- public PoolStats getPoolStats() {
- CallTracker.CallStats createStats;
- CallTracker.CallStats testStats;
- CallTracker.CallStats destroyStats;
-
- if (_lifeCycle.isWrapperFor(LifeCycleStatsCollector.class)) {
- LifeCycleStatsCollector statsCollector = _lifeCycle.unwrap(LifeCycleStatsCollector.class);
- createStats = statsCollector.getCreateCallStats();
- testStats = statsCollector.getTestCallStats();
- destroyStats = statsCollector.getDestroyCallStats();
- } else {
- createStats = NullStats.getInstance();
- testStats = NullStats.getInstance();
- destroyStats = NullStats.getInstance();
- }
-
- CallTracker.CallStats waitStats = _waitersCallTracker.getCallStats();
- CallTracker.CallStats checkedOut = _checkedOutCallTracker.getCallStats();
-
- return new PoolStatsImpl(
- _maximumEntities,
- _minimumEntities,
- _totalEntities.get(),
- _idleQueue.size(),
- createStats,
- testStats,
- destroyStats,
- waitStats,
- checkedOut);
- }
-
- private static class PoolStatsImpl implements PoolStats {
- private final int _maxPoolSize;
- private final int _minPoolSize;
- private final int _poolSize;
- private final int _idleCount;
- private final CallTracker.CallStats _createStats;
- private final CallTracker.CallStats _testStats;
- private final CallTracker.CallStats _destroyStats;
- private final CallTracker.CallStats _waitStats;
- private final CallTracker.CallStats _checkedOut;
-
- private PoolStatsImpl(
- int maxPoolSize,
- int minPoolSize,
- int poolSize,
- int idleCount,
- CallTracker.CallStats createStats,
- CallTracker.CallStats testStats,
- CallTracker.CallStats destroyStats,
- CallTracker.CallStats waitStats,
- CallTracker.CallStats checkedOut) {
- _maxPoolSize = maxPoolSize;
- _minPoolSize = minPoolSize;
- _poolSize = poolSize;
- _idleCount = idleCount;
- _createStats = createStats;
- _testStats = testStats;
- _destroyStats = destroyStats;
- _waitStats = waitStats;
- _checkedOut = checkedOut;
- }
-
- @Override
- public long getTotalCreated() {
- return _createStats.getCallCountTotal() - _createStats.getErrorCountTotal();
- }
-
- @Override
- public long getTotalDestroyed() {
- return _destroyStats.getCallCountTotal() - _destroyStats.getErrorCountTotal();
- }
-
- @Override
- public long getTotalCreateErrors() {
- return _createStats.getErrorCountTotal();
- }
-
- @Override
- public long getTotalDestroyErrors() {
- return _destroyStats.getErrorCountTotal();
- }
-
- @Override
- public double getCheckedOut1min() {
- return _checkedOut.getAverageConcurrency1min();
- }
-
- @Override
- public double getCheckedOut5min() {
- return _checkedOut.getAverageConcurrency5min();
- }
-
- @Override
- public double getCheckedOut15min() {
- return _checkedOut.getAverageConcurrency15min();
- }
-
- @Override
- public int getMaxCheckedOut1min() {
- return _checkedOut.getMaxConcurrency1min();
- }
-
- @Override
- public double getCheckedOutTimeAvg() {
- return nanosToMillis(_checkedOut.getCallTimeStats().getAverage());
- }
-
- @Override
- public double getCheckedOutTime50Pct() {
- return nanosToMillis(_checkedOut.getCallTimeStats().get50Pct());
- }
-
- @Override
- public double getCheckedOutTime95Pct() {
- return nanosToMillis(_checkedOut.getCallTimeStats().get95Pct());
- }
-
- @Override
- public double getCheckedOutTime99Pct() {
- return nanosToMillis(_checkedOut.getCallTimeStats().get99Pct());
- }
-
- @Override
- public int getMaxPoolSize() {
- return _maxPoolSize;
- }
-
- @Override
- public int getMinPoolSize() {
- return _minPoolSize;
- }
-
- @Override
- public int getPoolSize() {
- return _poolSize;
- }
-
- @Override
- public int getIdleCount() {
- return _idleCount;
- }
-
- @Override
- public double getWaiters1min() {
- return _waitStats.getAverageConcurrency1min();
- }
-
- @Override
- public double getWaiters5min() {
- return _waitStats.getAverageConcurrency5min();
- }
-
- @Override
- public double getWaiters15min() {
- return _waitStats.getAverageConcurrency15min();
- }
-
- @Override
- public double getWaitTimeAvg() {
- return nanosToMillis(_waitStats.getCallTimeStats().getAverage());
- }
-
- @Override
- public double getWaitTime50Pct() {
- return nanosToMillis(_waitStats.getCallTimeStats().get50Pct());
- }
-
- @Override
- public double getWaitTime95Pct() {
- return nanosToMillis(_waitStats.getCallTimeStats().get95Pct());
- }
-
- @Override
- public double getWaitTime99Pct() {
- return nanosToMillis(_waitStats.getCallTimeStats().get99Pct());
- }
-
- @Override
- public LifeCycleStats getLifecycleStats() {
- return new LifeCycleStatsImpl(
- _createStats.getCallTimeStats(),
- _testStats.getCallTimeStats(),
- _destroyStats.getCallTimeStats());
- }
- }
-
- private static class LifeCycleStatsImpl implements PoolStats.LifeCycleStats {
- private final LongStats _createCallStats;
- private final LongStats _testCallStats;
- private final LongStats _destroyCallStats;
-
- private LifeCycleStatsImpl(LongStats createCallStats, LongStats testCallStats, LongStats destroyCallStats) {
- _createCallStats = createCallStats;
- _testCallStats = testCallStats;
- _destroyCallStats = destroyCallStats;
- }
-
- @Override
- public double getCreateTimeAvg() {
- return nanosToMillis(_createCallStats.getAverage());
- }
-
- @Override
- public double getCreateTime50Pct() {
- return nanosToMillis(_createCallStats.get50Pct());
- }
-
- @Override
- public double getCreateTime95Pct() {
- return nanosToMillis(_createCallStats.get95Pct());
- }
-
- @Override
- public double getCreateTime99Pct() {
- return nanosToMillis(_createCallStats.get99Pct());
- }
-
- @Override
- public double getTestTimeAvg() {
- return nanosToMillis(_testCallStats.getAverage());
- }
-
- @Override
- public double getTestTime50Pct() {
- return nanosToMillis(_testCallStats.get50Pct());
- }
-
- @Override
- public double getTestTime95Pct() {
- return nanosToMillis(_testCallStats.get95Pct());
- }
-
- @Override
- public double getTestTime99Pct() {
- return nanosToMillis(_testCallStats.get99Pct());
- }
-
- @Override
- public double getDestroyTimeAvg() {
- return nanosToMillis(_destroyCallStats.getAverage());
- }
-
- @Override
- public double getDestroyTime50Pct() {
- return nanosToMillis(_destroyCallStats.get50Pct());
- }
-
- @Override
- public double getDestroyTime95Pct() {
- return nanosToMillis(_destroyCallStats.get95Pct());
-
- }
-
- @Override
- public double getDestroyTime99Pct() {
- return nanosToMillis(_destroyCallStats.get99Pct());
- }
- }
-
- private static double nanosToMillis(double nanos) {
- return Double.isFinite(nanos) ? nanos / 1000000.0 : Double.NaN;
- }
-
- private static double nanosToMillis(Long nanos) {
- return nanos != null ? nanos / 1000000.0 : Double.NaN;
- }
-
- private class IdleEntity implements Supplier {
- private final long _idleNanos;
- private final T _idleEntity;
-
- private IdleEntity(T idleEntity) {
- _idleNanos = Time.nanoTime();
- _idleEntity = idleEntity;
- }
-
- @Override
- public T get() {
- return _idleEntity;
- }
- }
-
- public static class TooManyWaitersException extends CancellationException {
- }
-
- public static class ShutdownException extends CancellationException {
- }
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/AsyncQOSPoolImpl.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/AsyncQOSPoolImpl.java
deleted file mode 100644
index b934e0e7c35..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/AsyncQOSPoolImpl.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package com.linkedin.alpini.base.pool.impl;
-
-import com.linkedin.alpini.base.pool.AsyncQOSPool;
-import com.linkedin.alpini.base.queuing.QOSBasedRequestRunnable;
-import com.linkedin.alpini.base.queuing.QOSPolicy;
-import com.linkedin.alpini.base.queuing.SimpleQueue;
-import com.linkedin.alpini.consts.QOS;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-
-/**
- * @author Antony T Curtis {@literal }
- */
-public class AsyncQOSPoolImpl extends AsyncPoolImpl implements AsyncQOSPool {
- private static final Logger LOG = LogManager.getLogger(AsyncQOSPool.class);
- private final SimpleQueue _pending;
-
- public AsyncQOSPoolImpl(
- LifeCycle lifeCycle,
- QOSPolicy.StaticConfig qosPolicyConfig,
- Executor executor,
- int maxConcurrentCreate,
- int minimumEntities,
- int maximumEntities,
- long maxIdleTime,
- TimeUnit maxIdleUnit) {
- this(
- lifeCycle,
- QOSPolicy.getQOSPolicy(Objects.requireNonNull(qosPolicyConfig, "qosPolicyConfig")),
- executor,
- maxConcurrentCreate,
- minimumEntities,
- maximumEntities,
- maxIdleTime,
- maxIdleUnit);
- }
-
- private AsyncQOSPoolImpl(
- LifeCycle lifeCycle,
- SimpleQueue pendingQueue,
- Executor executor,
- int maxConcurrentCreate,
- int minimumEntities,
- int maximumEntities,
- long maxIdleTime,
- TimeUnit maxIdleUnit) {
- super(
- lifeCycle,
- executor,
- maxConcurrentCreate,
- minimumEntities,
- minimumEntities,
- maximumEntities,
- maxIdleTime,
- maxIdleUnit);
- _pending = Objects.requireNonNull(pendingQueue, "pendingQueue");
- }
-
- @Override
- public CompletableFuture acquire() {
- return checkout(acquire0("", QOS.NORMAL), startWaiters());
- }
-
- @Override
- public CompletableFuture acquire(String queueName, QOS qos) {
- return checkout(acquire0(Objects.requireNonNull(queueName), Objects.requireNonNull(qos)), startWaiters());
- }
-
- protected final CompletableFuture acquire0(String queueName, QOS qos) {
- CompletableFuture future = super.acquire0();
- if (future.isCompletedExceptionally()) {
- return future;
- }
- try {
- Waiter w = new Waiter(queueName, qos, new CompletableFuture<>());
-
- if (_pending.add(w)) {
- return w._future;
- } else {
- w._future.obtrudeException(new IllegalStateException());
- return w._future;
- }
- } finally {
- future.whenComplete(this::issue);
- }
- }
-
- private void issue(T entity, Throwable ex) {
- Waiter w;
- if (ex == null) {
- while ((w = _pending.poll()) != null) {
- if (w._future.complete(entity)) {
- return;
- }
- }
- release0(entity);
- } else {
- while ((w = _pending.poll()) != null) {
- if (w._future.completeExceptionally(ex)) {
- return;
- }
- }
- LOG.warn("Dropped exception", ex);
- }
- }
-
- private class Waiter extends QOSBasedRequestRunnable {
- private final CompletableFuture _future;
-
- public Waiter(String queueName, QOS qos, CompletableFuture future) {
- super(queueName, qos, null);
- _future = future;
- }
- }
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/LifeCycleFilter.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/LifeCycleFilter.java
deleted file mode 100644
index 92d006c6800..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/LifeCycleFilter.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package com.linkedin.alpini.base.pool.impl;
-
-import com.linkedin.alpini.base.pool.AsyncPool;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-
-
-/**
- * @author Antony T Curtis {@literal }
- */
-public class LifeCycleFilter implements AsyncPool.LifeCycle {
- private final AsyncPool.LifeCycle _lifeCycle;
-
- public LifeCycleFilter(AsyncPool.LifeCycle lifeCycle) {
- _lifeCycle = Objects.requireNonNull(lifeCycle);
- }
-
- @Override
- public CompletableFuture create() {
- return _lifeCycle.create();
- }
-
- @Override
- public CompletableFuture testOnRelease(T entity) {
- return _lifeCycle.testOnRelease(entity);
- }
-
- @Override
- public CompletableFuture testAfterIdle(T entity) {
- return _lifeCycle.testAfterIdle(entity);
- }
-
- @Override
- public CompletableFuture destroy(T entity) {
- return _lifeCycle.destroy(entity);
- }
-
- @Override
- public W unwrap(Class iface) {
- if (iface.isAssignableFrom(getClass())) {
- return iface.cast(this);
- } else {
- return _lifeCycle.unwrap(iface);
- }
- }
-
- @Override
- public boolean isWrapperFor(Class> iface) {
- return iface.isAssignableFrom(getClass()) || _lifeCycle.isWrapperFor(iface);
- }
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/LifeCycleStatsCollector.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/LifeCycleStatsCollector.java
deleted file mode 100644
index 164020354cb..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/LifeCycleStatsCollector.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package com.linkedin.alpini.base.pool.impl;
-
-import com.linkedin.alpini.base.monitoring.CallCompletion;
-import com.linkedin.alpini.base.monitoring.CallTracker;
-import com.linkedin.alpini.base.pool.AsyncPool;
-import java.util.concurrent.CompletableFuture;
-import javax.annotation.Nonnull;
-
-
-/**
- * @author Antony T Curtis {@literal }
- */
-public class LifeCycleStatsCollector extends LifeCycleFilter {
- private final CallTracker _createCallTracker;
- private final CallTracker _testCallTracker;
- private final CallTracker _destroyCallTracker;
-
- public LifeCycleStatsCollector(@Nonnull AsyncPool.LifeCycle lifeCycle) {
- super(lifeCycle);
-
- _createCallTracker = createCallTracker();
- _testCallTracker = createCallTracker();
- _destroyCallTracker = createCallTracker();
- }
-
- protected CallTracker createCallTracker() {
- return CallTracker.create();
- }
-
- @Override
- public CompletableFuture create() {
- CallCompletion completion = _createCallTracker.startCall();
- return super.create().whenComplete(completion::closeCompletion);
- }
-
- public CallTracker.CallStats getCreateCallStats() {
- return _createCallTracker.getCallStats();
- }
-
- @Override
- public CompletableFuture testOnRelease(T entity) {
- CallCompletion completion = _testCallTracker.startCall();
- return super.testOnRelease(entity).whenComplete(completion::closeCompletion);
- }
-
- @Override
- public CompletableFuture testAfterIdle(T entity) {
- CallCompletion completion = _testCallTracker.startCall();
- return super.testAfterIdle(entity).whenComplete(completion::closeCompletion);
- }
-
- public CallTracker.CallStats getTestCallStats() {
- return _testCallTracker.getCallStats();
- }
-
- @Override
- public CompletableFuture destroy(T entity) {
- CallCompletion completion = _destroyCallTracker.startCall();
- return super.destroy(entity).whenComplete(completion::closeCompletion);
- }
-
- public CallTracker.CallStats getDestroyCallStats() {
- return _destroyCallTracker.getCallStats();
- }
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/NullStats.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/NullStats.java
deleted file mode 100644
index 1f2794f95e1..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/NullStats.java
+++ /dev/null
@@ -1,166 +0,0 @@
-package com.linkedin.alpini.base.pool.impl;
-
-import com.linkedin.alpini.base.monitoring.CallTracker;
-import com.linkedin.alpini.base.statistics.LongStats;
-
-
-/**
- * @author Antony T Curtis {@literal }
- */
-public class NullStats implements CallTracker.CallStats {
- private static final NullStats INSTANCE = new NullStats();
-
- public static CallTracker.CallStats getInstance() {
- return INSTANCE;
- }
-
- private NullStats() {
- }
-
- @Override
- public long getCallCountTotal() {
- return 0;
- }
-
- @Override
- public long getCallStartCountTotal() {
- return 0;
- }
-
- @Override
- public long getErrorCountTotal() {
- return 0;
- }
-
- @Override
- public int getConcurrency() {
- return 0;
- }
-
- @Override
- public double getAverageConcurrency1min() {
- return 0;
- }
-
- @Override
- public double getAverageConcurrency5min() {
- return 0;
- }
-
- @Override
- public double getAverageConcurrency15min() {
- return 0;
- }
-
- @Override
- public int getMaxConcurrency1min() {
- return 0;
- }
-
- @Override
- public int getMaxConcurrency5min() {
- return 0;
- }
-
- @Override
- public int getMaxConcurrency15min() {
- return 0;
- }
-
- @Override
- public int getStartFrequency1min() {
- return 0;
- }
-
- @Override
- public int getStartFrequency5min() {
- return 0;
- }
-
- @Override
- public int getStartFrequency15min() {
- return 0;
- }
-
- @Override
- public int getErrorFrequency1min() {
- return 0;
- }
-
- @Override
- public int getErrorFrequency5min() {
- return 0;
- }
-
- @Override
- public int getErrorFrequency15min() {
- return 0;
- }
-
- @Override
- public long getOutstandingStartTimeAvg() {
- return 0;
- }
-
- @Override
- public int getOutstandingCount() {
- return 0;
- }
-
- private final LongStats _longStats = new LongStats() {
- @Override
- public long getLongCount() {
- return 0;
- }
-
- @Override
- public double getAverage() {
- return 0;
- }
-
- @Override
- public double getStandardDeviation() {
- return Double.NaN;
- }
-
- @Override
- public Long getMinimum() {
- return null;
- }
-
- @Override
- public Long getMaximum() {
- return null;
- }
-
- @Override
- public Long get50Pct() {
- return null;
- }
-
- @Override
- public Long get90Pct() {
- return null;
- }
-
- @Override
- public Long get95Pct() {
- return null;
- }
-
- @Override
- public Long get99Pct() {
- return null;
- }
-
- @Override
- public Long get99_9Pct() {
- return null;
- }
- };
-
- @Override
- public LongStats getCallTimeStats() {
- return _longStats;
- }
-}
diff --git a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/RateLimitedCreateLifeCycle.java b/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/RateLimitedCreateLifeCycle.java
deleted file mode 100644
index d4f5416ccec..00000000000
--- a/internal/alpini/common/alpini-common-base/src/main/java/com/linkedin/alpini/base/pool/impl/RateLimitedCreateLifeCycle.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package com.linkedin.alpini.base.pool.impl;
-
-import com.linkedin.alpini.base.misc.Time;
-import com.linkedin.alpini.base.pool.AsyncPool;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nonnull;
-
-
-/**
- * @author Antony T Curtis {@literal }
- */
-public class RateLimitedCreateLifeCycle