diff --git a/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java b/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java index b4179745da2f..e83a2c859209 100644 --- a/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java +++ b/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java @@ -36,6 +36,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.memtable.AbstractAllocatorMemtable; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.DefaultNameFactory; import org.apache.cassandra.metrics.MetricNameFactory; @@ -128,6 +129,16 @@ private static AdaptiveCompressor createForCompaction(int minCompressionLevel, i return instances.computeIfAbsent(params, p -> new AdaptiveCompressor(p, compactionPressureSupplier)); } + /** + * Creates a compressor that doesn't refer to any other C* components like compaction manager or memory pools. + */ + @VisibleForTesting + public static ICompressor createForUnitTesting() + { + Params params = new Params(Uses.GENERAL, 9, 9, 0); + return new AdaptiveCompressor(params, () -> 0.0); + } + public static AdaptiveCompressor createForFlush(Map options) { int minCompressionLevel = getMinCompressionLevel(Uses.FAST_COMPRESSION, options); @@ -145,6 +156,7 @@ private static AdaptiveCompressor createForFlush(int minCompressionLevel, int ma private final ThreadLocal state; private final Supplier writePressureSupplier; + static class Params { final Uses use; @@ -422,13 +434,20 @@ private static double getFlushPressure() private static double getCompactionPressure(int maxCompactionQueueLength) { CompactionManager compactionManager = CompactionManager.instance; - double rateLimit = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1000 * 1000; - double actualRate = compactionManager.getMetrics().meanCompactionReadThroughput.getValue(); + long rateLimit = DatabaseDescriptor.getCompactionThroughputMbPerSec() * FileUtils.ONE_MB; + if (rateLimit == 0) + rateLimit = Long.MAX_VALUE; + double actualRate = compactionManager.getMetrics().bytesCompactedThroughput.getOneMinuteRate(); + // We don't want to speed up compression if we can keep up with the configured compression rate limit + // 0.0 if actualRate >= rateLimit + // 1.0 if actualRate <= 0.8 * rateLimit; + double rateLimitFactor = Math.min(1.0, Math.max(0.0, (rateLimit - actualRate) / (0.2 * rateLimit))); long pendingCompactions = compactionManager.getPendingTasks(); long activeCompactions = compactionManager.getActiveCompactions(); long queuedCompactions = pendingCompactions - activeCompactions; - return (double) queuedCompactions / (maxCompactionQueueLength * DatabaseDescriptor.getConcurrentCompactors()); + double compactionQueuePressure = Math.min(1.0, (double) queuedCompactions / (maxCompactionQueueLength * DatabaseDescriptor.getConcurrentCompactors())); + return compactionQueuePressure * rateLimitFactor; } private int clampCompressionLevel(long compressionLevel) diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java index 42ad77a95ded..7cf37844892e 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java @@ -56,7 +56,7 @@ public static void initialize() DeflateCompressor.create(Collections.emptyMap()), SnappyCompressor.create(Collections.emptyMap()), ZstdCompressor.create(Collections.emptyMap()), - AdaptiveCompressor.create(Collections.emptyMap()), + AdaptiveCompressor.createForUnitTesting(), NoopCompressor.create(Collections.emptyMap()) }; @@ -202,7 +202,7 @@ public void testZstdByteBuffers() throws IOException @Test public void testAdaptiveByteBuffers() throws IOException { - compressor = AdaptiveCompressor.create(Collections.emptyMap()); + compressor = AdaptiveCompressor.createForUnitTesting(); testByteBuffers(); }