Skip to content

Commit

Permalink
Don't increase the pressure if close to the compaction throughput limit
Browse files Browse the repository at this point in the history
  • Loading branch information
pkolaczk committed Nov 27, 2024
1 parent e8cc355 commit 7222496
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
25 changes: 22 additions & 3 deletions src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.memtable.AbstractAllocatorMemtable;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.metrics.DefaultNameFactory;
import org.apache.cassandra.metrics.MetricNameFactory;
Expand Down Expand Up @@ -128,6 +129,16 @@ private static AdaptiveCompressor createForCompaction(int minCompressionLevel, i
return instances.computeIfAbsent(params, p -> new AdaptiveCompressor(p, compactionPressureSupplier));
}

/**
* Creates a compressor that doesn't refer to any other C* components like compaction manager or memory pools.
*/
@VisibleForTesting
public static ICompressor createForUnitTesting()
{
Params params = new Params(Uses.GENERAL, 9, 9, 0);
return new AdaptiveCompressor(params, () -> 0.0);
}

public static AdaptiveCompressor createForFlush(Map<String, String> options)
{
int minCompressionLevel = getMinCompressionLevel(Uses.FAST_COMPRESSION, options);
Expand All @@ -145,6 +156,7 @@ private static AdaptiveCompressor createForFlush(int minCompressionLevel, int ma
private final ThreadLocal<State> state;
private final Supplier<Double> writePressureSupplier;


static class Params
{
final Uses use;
Expand Down Expand Up @@ -422,13 +434,20 @@ private static double getFlushPressure()
private static double getCompactionPressure(int maxCompactionQueueLength)
{
CompactionManager compactionManager = CompactionManager.instance;
double rateLimit = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1000 * 1000;
double actualRate = compactionManager.getMetrics().meanCompactionReadThroughput.getValue();
long rateLimit = DatabaseDescriptor.getCompactionThroughputMbPerSec() * FileUtils.ONE_MB;
if (rateLimit == 0)
rateLimit = Long.MAX_VALUE;
double actualRate = compactionManager.getMetrics().bytesCompactedThroughput.getOneMinuteRate();
// We don't want to speed up compression if we can keep up with the configured compression rate limit
// 0.0 if actualRate >= rateLimit
// 1.0 if actualRate <= 0.8 * rateLimit;
double rateLimitFactor = Math.min(1.0, Math.max(0.0, (rateLimit - actualRate) / (0.2 * rateLimit)));

long pendingCompactions = compactionManager.getPendingTasks();
long activeCompactions = compactionManager.getActiveCompactions();
long queuedCompactions = pendingCompactions - activeCompactions;
return (double) queuedCompactions / (maxCompactionQueueLength * DatabaseDescriptor.getConcurrentCompactors());
double compactionQueuePressure = Math.min(1.0, (double) queuedCompactions / (maxCompactionQueueLength * DatabaseDescriptor.getConcurrentCompactors()));
return compactionQueuePressure * rateLimitFactor;
}

private int clampCompressionLevel(long compressionLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static void initialize()
DeflateCompressor.create(Collections.<String, String>emptyMap()),
SnappyCompressor.create(Collections.<String, String>emptyMap()),
ZstdCompressor.create(Collections.emptyMap()),
AdaptiveCompressor.create(Collections.emptyMap()),
AdaptiveCompressor.createForUnitTesting(),
NoopCompressor.create(Collections.emptyMap())
};

Expand Down Expand Up @@ -202,7 +202,7 @@ public void testZstdByteBuffers() throws IOException
@Test
public void testAdaptiveByteBuffers() throws IOException
{
compressor = AdaptiveCompressor.create(Collections.<String, String>emptyMap());
compressor = AdaptiveCompressor.createForUnitTesting();
testByteBuffers();
}

Expand Down

0 comments on commit 7222496

Please sign in to comment.