Skip to content

Commit

Permalink
Don't increase the pressure if close to the compaction throughput limit
Browse files Browse the repository at this point in the history
If we can compress fast enough that the RateLimiter kicks in,
there is no point in speeding the compression up and making the
compression ratio worse.
  • Loading branch information
pkolaczk committed Nov 26, 2024
1 parent 9bdc785 commit 787932c
Show file tree
Hide file tree
Showing 23 changed files with 208 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import com.google.common.collect.Multiset;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;

import org.apache.cassandra.utils.concurrent.RateLimiter;
import org.apache.cassandra.io.storage.StorageProvider;
import org.apache.cassandra.io.util.File;
import org.slf4j.Logger;
Expand Down
4 changes: 1 addition & 3 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -84,6 +83,7 @@
import org.apache.cassandra.db.compaction.CompactionStrategyManager;
import org.apache.cassandra.db.compaction.CompactionStrategyOptions;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.utils.concurrent.RateLimiter;
import org.apache.cassandra.db.compaction.TableOperation;
import org.apache.cassandra.db.compaction.Verifier;
import org.apache.cassandra.db.compaction.unified.Environment;
Expand All @@ -102,7 +102,6 @@
import org.apache.cassandra.db.partitions.CachedPartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.repair.CassandraTableRepairManager;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.streaming.CassandraStreamManager;
import org.apache.cassandra.db.view.TableViews;
import org.apache.cassandra.dht.AbstractBounds;
Expand Down Expand Up @@ -140,7 +139,6 @@
import org.apache.cassandra.nodes.Nodes;
import org.apache.cassandra.repair.TableRepairManager;
import org.apache.cassandra.repair.consistent.admin.PendingStat;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.schema.CompressionParams;
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/Directories.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.concurrent.RateLimiter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.FSDiskFullWriteError;
import org.apache.cassandra.io.FSError;
Expand Down
3 changes: 1 addition & 2 deletions src/java/org/apache/cassandra/db/Keyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,7 +49,6 @@
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.UncheckedInternalRequestExecutionException;
import org.apache.cassandra.exceptions.UnknownKeyspaceException;
import org.apache.cassandra.exceptions.UnknownTableException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.SecondaryIndexManager;
Expand All @@ -73,6 +71,7 @@
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.RateLimiter;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.OpOrder;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import javax.annotation.Nullable;

import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +35,7 @@
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.utils.concurrent.RateLimiter;
import org.apache.cassandra.utils.concurrent.OpOrder;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;

import org.apache.cassandra.db.compaction.writers.SSTableDataSink;
import org.apache.cassandra.db.rows.BTreeRow;
Expand All @@ -40,6 +39,7 @@
import org.apache.cassandra.io.sstable.compaction.SkipEmptyDataCursor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.concurrent.RateLimiter;

/**
* Counterpart to CompactionIterator. Maintains sstable cursors, applies limiter and produces metrics. In the future it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -122,6 +121,7 @@
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.NonThrowingCloseable;
import org.apache.cassandra.utils.concurrent.RateLimiter;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
Expand Down Expand Up @@ -263,6 +263,15 @@ public CompactionMetrics getMetrics()
return metrics;
}

/**
* Gets the sum of current total read throughput of all compaction tasks in real-time.
* Internally uses the data from the rate limiter so it is more up to date than the values recorded in metrics.
*/
public double getCompactionThroughputReadBytesPerSec()
{
return compactionRateLimiter.getActualRate();
}

/**
* Gets compaction rate limiter.
* Rate unit is bytes per sec.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,6 +54,7 @@
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.RateLimiter;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Refs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,13 +422,18 @@ private static double getFlushPressure()
private static double getCompactionPressure(int maxCompactionQueueLength)
{
CompactionManager compactionManager = CompactionManager.instance;
double rateLimit = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1000 * 1000;
double actualRate = compactionManager.getMetrics().meanCompactionReadThroughput.getValue();
double rateLimit = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024;
double actualRate = compactionManager.getCompactionThroughputReadBytesPerSec();
// We don't want to speed up compression if we can keep up with the configured compression rate limit
// 0.0 if actualRate >= rateLimit
// 1.0 if actualRate <= 0.8 * rateLimit;
double rateLimitFactor = Math.min(1.0, Math.max(0.0, 5.0 * (rateLimit - actualRate) / rateLimit));

long pendingCompactions = compactionManager.getPendingTasks();
long activeCompactions = compactionManager.getActiveCompactions();
long queuedCompactions = pendingCompactions - activeCompactions;
return (double) queuedCompactions / (maxCompactionQueueLength * DatabaseDescriptor.getConcurrentCompactors());
double compactionQueuePressure = Math.min(1.0, (double) queuedCompactions / (maxCompactionQueueLength * DatabaseDescriptor.getConcurrentCompactors()));
return compactionQueuePressure * rateLimitFactor;
}

private int clampCompressionLevel(long compressionLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import java.io.IOException;

import com.google.common.util.concurrent.RateLimiter;

import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ClusteringBoundOrBoundary;
import org.apache.cassandra.db.ClusteringPrefix;
Expand All @@ -41,6 +39,7 @@
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.concurrent.RateLimiter;

/**
* Cursor over sstable data files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -77,6 +76,7 @@
import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionSSTable;
import org.apache.cassandra.utils.concurrent.RateLimiter;
import org.apache.cassandra.db.compaction.Scrubber;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.lifecycle.AbstractLogTransaction;
Expand Down
3 changes: 1 addition & 2 deletions src/java/org/apache/cassandra/io/util/File.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@

import javax.annotation.Nullable;

import com.google.common.util.concurrent.RateLimiter;

import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.utils.concurrent.RateLimiter;

import static org.apache.cassandra.io.util.PathUtils.filename;
import static org.apache.cassandra.utils.Throwables.maybeFail;
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/io/util/FileHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import java.util.Optional;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.concurrent.RateLimiter;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.RefCounted;
Expand Down
4 changes: 1 addition & 3 deletions src/java/org/apache/cassandra/io/util/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,15 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -68,6 +65,7 @@
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.utils.DseLegacy;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.RateLimiter;
import org.apache.cassandra.utils.SyncUtil;

import static com.google.common.base.Throwables.propagate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import javax.annotation.concurrent.NotThreadSafe;

import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.RateLimiter;

import org.apache.cassandra.utils.concurrent.RateLimiter;

/**
* Rebufferer wrapper that applies rate limiting.
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/io/util/PathUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,6 +41,7 @@
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.storage.StorageProvider;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.RateLimiter;

import static java.nio.file.StandardOpenOption.*;
import static java.nio.file.StandardOpenOption.CREATE;
Expand Down
5 changes: 1 addition & 4 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Scanner;
import java.util.Set;
Expand All @@ -50,7 +49,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -90,7 +88,6 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand All @@ -115,7 +112,6 @@
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.SizeEstimatesRecorder;
import org.apache.cassandra.db.SnapshotDetailsTabularData;
import org.apache.cassandra.db.SystemKeyspace;
Expand Down Expand Up @@ -206,6 +202,7 @@
import org.apache.cassandra.utils.MD5Digest;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.RateLimiter;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.WindowsTimer;
import org.apache.cassandra.utils.WrappedRunnable;
Expand Down
27 changes: 27 additions & 0 deletions src/java/org/apache/cassandra/utils/ExpMovingAverage.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.cassandra.utils;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDouble;

/**
Expand Down Expand Up @@ -93,6 +94,32 @@ public MovingAverage update(double val)
return this;
}

/**
* Same as {@link #update(double)} but allows to scale the decay factor for each update separately.
* For integer n, has the same effect as calling update(val) n times.
* Useful when we want to do decay based not on the number of samples, but e.g. based on time,
* when time that passed between updates is variable.
*/
public MovingAverage update(double val, double n)
{
Preconditions.checkArgument(n > 0.0, "n must be > 0.0");
double current, update;
do
{
current = average.get();

if (!Double.isNaN(current))
update = current + Math.pow(alpha, n) * (val - current);
else
update = val; // Not initialized yet. Incidentally, passing NaN will cause reinitialization on the
// next update.
}
while (!average.compareAndSet(current, update));

return this;
}


@Override
public double get()
{
Expand Down
Loading

0 comments on commit 787932c

Please sign in to comment.