diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index b6f4d4bf5b39..b66120417776 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -479,6 +479,8 @@ commitlog_segment_size_in_mb: 32
# none : Flush without compressing blocks but while still doing checksums.
# fast : Flush with a fast compressor. If the table is already using a
# fast compressor that compressor is used.
+# adaptive : Flush with a fast adaptive compressor. If the table is already using a
+# fast compressor that compressor is used.
# table: Always flush with the same compressor that the table uses. This
# was the pre 4.0 behavior.
#
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 68ce61c7900d..655256e4e45c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -20,6 +20,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -37,6 +38,7 @@
import org.apache.cassandra.fql.FullQueryLoggerOptions;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.guardrails.GuardrailsConfig;
+import org.apache.cassandra.io.compress.AdaptiveCompressor;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -292,8 +294,9 @@ public class Config
public double commitlog_sync_group_window_in_ms = Double.NaN;
public int commitlog_sync_period_in_ms;
public int commitlog_segment_size_in_mb = 32;
+
public ParameterizedClass commitlog_compression;
- public FlushCompression flush_compression = FlushCompression.fast;
+ public FlushCompression flush_compression;
public int commitlog_max_compression_buffers_in_pool = 3;
public Integer periodic_commitlog_sync_lag_block_in_ms;
public TransparentDataEncryptionOptions transparent_data_encryption_options = new TransparentDataEncryptionOptions();
@@ -664,6 +667,7 @@ public enum FlushCompression
{
none,
fast,
+ adaptive,
table
}
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 84b2278909c1..7c54de174f17 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2119,7 +2119,9 @@ public static void setCommitLogCompression(ParameterizedClass compressor)
public static Config.FlushCompression getFlushCompression()
{
- return conf.flush_compression;
+ return Objects.requireNonNullElseGet(conf.flush_compression, () -> shouldUseAdaptiveCompressionByDefault()
+ ? Config.FlushCompression.adaptive
+ : Config.FlushCompression.fast);
}
public static void setFlushCompression(Config.FlushCompression compression)
@@ -2127,7 +2129,12 @@ public static void setFlushCompression(Config.FlushCompression compression)
conf.flush_compression = compression;
}
- /**
+ public static boolean shouldUseAdaptiveCompressionByDefault()
+ {
+ return System.getProperty("cassandra.default_sstable_compression", "fast").equals("adaptive");
+ }
+
+ /**
* Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
* (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
* more, depending on how soon the sync policy stops all writing threads.
diff --git a/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java b/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java
new file mode 100644
index 000000000000..e83a2c859209
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.github.luben.zstd.Zstd;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.memtable.AbstractAllocatorMemtable;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.metrics.CassandraMetricsRegistry;
+import org.apache.cassandra.metrics.DefaultNameFactory;
+import org.apache.cassandra.metrics.MetricNameFactory;
+import org.apache.cassandra.utils.ExpMovingAverage;
+
+/**
+ * A compressor that dynamically adapts the compression level to the load.
+ * If the system is not heavily loaded by writes, data are compressed using high compression level.
+ * If the number of compactions or flushes queue up, it decreases the compression level to speed up
+ * flushing / compacting.
+ *
+ * Underneath, the ZStandard compressor is used. The compression level can be changed between the frames,
+ * even when compressing the same sstable file. ZStandard was chosen because at the fast end it
+ * can reach compression speed of LZ4, but at moderate compression levels it usually offers much better
+ * compression ratio (typically files are smaller by 20-40%) than LZ4 without compromising compression speed by too much.
+ *
+ * This compressor can be used for either of Uses: FAST_COMPRESSION and GENERAL.
+ * Each use can have different minimum and maximum compression level limits.
+ * For FAST_COMPRESSION, the number of pending flushes is used as the indicator of write load.
+ * For GENERAL compression, the number of pending compactions is used as the indicator of write load.
+ *
+ * Valid compression levels are in range 0..15 (inclusive), where 0 means fastest compression and 15 means slowest/best.
+ * Usually levels around 7-11 strike the best balance between performance and compresion ratio.
+ * Going above level 12 usually only results in slower compression but not much compression ratio improvement.
+ *
+ * Caution: This compressor decompresses about 2x-4x slower than LZ4Compressor, regardless of the compression level.
+ * Therefore, it may negatively affect read speed from very read-heavy tables, especially when the chunk-cache
+ * hit ratio is low. In synthetic tests with chunk cache disabled, read throughput turned out to be up to 10%
+ * lower than when using LZ4 on some workloads.
+ */
+public class AdaptiveCompressor implements ICompressor
+{
+ @VisibleForTesting
+ static final Map metrics = new EnumMap<>(Map.of(
+ Uses.FAST_COMPRESSION, new Metrics(Uses.FAST_COMPRESSION),
+ Uses.GENERAL, new Metrics(Uses.GENERAL)
+ ));
+
+ protected static final String MIN_COMPRESSION_LEVEL_OPTION_NAME = "min_compression_level";
+ protected static final String MAX_COMPRESSION_LEVEL_OPTION_NAME = "max_compression_level";
+ protected static final String MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME = "max_compaction_queue_length";
+
+ /**
+ * Maps AdaptiveCompressor compression level to underlying ZStandard compression levels.
+ * This mapping is needed because ZStandard levels are not continuous, zstd level 0 is special and means level 3.
+ * Hence, we just use our own continuous scale starting at 0.
+ */
+ private static final int[] zstdCompressionLevels = {
+ -7, // 0 (very fast but compresses poorly)
+ -6, // 1
+ -5, // 2 (LZ4 level is somewhere here)
+ -4, // 3
+ -3, // 4
+ -2, // 5
+ -1, // 6
+ 1, // 7 (sweet spot area usually here)
+ 2, // 8 (sweet spot area usually here)
+ 3, // 9 (sweet spot area usually here, ~50% slower than LZ4)
+ 4, // 10 (sweet spot area usually here)
+ 5, // 11 (sweet spot area usually here)
+ 6, // 12
+ 7, // 13
+ 8, // 14
+ 9, // 15 (very slow, usually over 10x slower than LZ4)
+ };
+
+ public static final int MIN_COMPRESSION_LEVEL = 0;
+ public static final int MAX_COMPRESSION_LEVEL = 15;
+
+ public static final int DEFAULT_MIN_FAST_COMPRESSION_LEVEL = 2; // zstd level -5
+ public static final int DEFAULT_MAX_FAST_COMPRESSION_LEVEL = 9; // zstd level 5
+ public static final int DEFAULT_MIN_GENERAL_COMPRESSION_LEVEL = 7; // zstd level 1
+ public static final int DEFAULT_MAX_GENERAL_COMPRESSION_LEVEL = 12; // zstd level 6
+ public static final int DEFAULT_MAX_COMPACTION_QUEUE_LENGTH = 16;
+
+ private static final ConcurrentHashMap instances = new ConcurrentHashMap<>();
+
+ public static AdaptiveCompressor create(Map options)
+ {
+ int minCompressionLevel = getMinCompressionLevel(Uses.GENERAL, options);
+ int maxCompressionLevel = getMaxCompressionLevel(Uses.GENERAL, options);
+ int maxCompactionQueueLength = getMaxCompactionQueueLength(options);
+ return createForCompaction(minCompressionLevel, maxCompressionLevel, maxCompactionQueueLength);
+ }
+
+ private static AdaptiveCompressor createForCompaction(int minCompressionLevel, int maxCompressionLevel, int maxCompactionQueueLength)
+ {
+ Params params = new Params(Uses.GENERAL, minCompressionLevel, maxCompressionLevel, maxCompactionQueueLength);
+ Supplier compactionPressureSupplier = () -> getCompactionPressure(maxCompactionQueueLength);
+ return instances.computeIfAbsent(params, p -> new AdaptiveCompressor(p, compactionPressureSupplier));
+ }
+
+ /**
+ * Creates a compressor that doesn't refer to any other C* components like compaction manager or memory pools.
+ */
+ @VisibleForTesting
+ public static ICompressor createForUnitTesting()
+ {
+ Params params = new Params(Uses.GENERAL, 9, 9, 0);
+ return new AdaptiveCompressor(params, () -> 0.0);
+ }
+
+ public static AdaptiveCompressor createForFlush(Map options)
+ {
+ int minCompressionLevel = getMinCompressionLevel(Uses.FAST_COMPRESSION, options);
+ int maxCompressionLevel = getMaxCompressionLevel(Uses.FAST_COMPRESSION, options);
+ return createForFlush(minCompressionLevel, maxCompressionLevel);
+ }
+
+ private static AdaptiveCompressor createForFlush(int minCompressionLevel, int maxCompressionLevel)
+ {
+ Params params = new Params(Uses.FAST_COMPRESSION, minCompressionLevel, maxCompressionLevel, 0);
+ return instances.computeIfAbsent(params, p -> new AdaptiveCompressor(p, AdaptiveCompressor::getFlushPressure));
+ }
+
+ private final Params params;
+ private final ThreadLocal state;
+ private final Supplier writePressureSupplier;
+
+
+ static class Params
+ {
+ final Uses use;
+ final int minCompressionLevel;
+ final int maxCompressionLevel;
+ final int maxCompactionQueueLength;
+
+ Params(Uses use, int minCompressionLevel, int maxCompressionLevel, int maxCompactionQueueLength)
+ {
+ if (minCompressionLevel < MIN_COMPRESSION_LEVEL || minCompressionLevel > MAX_COMPRESSION_LEVEL)
+ throw new IllegalArgumentException("Min compression level " + minCompressionLevel + "out of range" +
+ " [" + MIN_COMPRESSION_LEVEL + ", " + MAX_COMPRESSION_LEVEL + ']');
+ if (maxCompressionLevel < MIN_COMPRESSION_LEVEL || maxCompressionLevel > MAX_COMPRESSION_LEVEL)
+ throw new IllegalArgumentException("Max compression level " + maxCompressionLevel + "out of range" +
+ " [" + MIN_COMPRESSION_LEVEL + ", " + MAX_COMPRESSION_LEVEL + ']');
+ if (maxCompactionQueueLength < 0)
+ throw new IllegalArgumentException("Negative max compaction queue length: " + maxCompactionQueueLength);
+
+ this.use = use;
+ this.minCompressionLevel = minCompressionLevel;
+ this.maxCompressionLevel = maxCompressionLevel;
+ this.maxCompactionQueueLength = maxCompactionQueueLength;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == null || getClass() != o.getClass()) return false;
+ Params params = (Params) o;
+ return minCompressionLevel == params.minCompressionLevel && maxCompressionLevel == params.maxCompressionLevel && use == params.use;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(use, minCompressionLevel, maxCompressionLevel);
+ }
+ }
+
+ /**
+ * Keeps thread local state.
+ * We need this because we want to not only monitor pending flushes/compactions but also how much
+ * time we spend in compression relative to time spent by the thread in non-compression tasks like preparation
+ * of data or writing. Because ICompressor can be shared by multiple threads, we need to keep
+ * track of each thread separately.
+ */
+ class State
+ {
+ /**
+ * ZStandard compression level that was used when compressing the previous chunk.
+ * Can be adjusted up or down by at most 1 with every next block.
+ */
+ int currentCompressionLevel;
+
+ /**
+ * How much time is spent by the thread in compression relative to the time spent in non-compression code.
+ * Valid range is [0.0, 1.0].
+ * 1.0 means we're doing only compression and nothing else.
+ * 0.0 means we're not spending any time doing compression.
+ * This indicator allows us to detect whether we're bottlenecked by something else than compression,
+ * e.g. by disk I/O or by preparation of data to compress (e.g. iterating the memtable trie).
+ * If this value is low, then there is not much gain in decreasing the compression
+ * level.
+ */
+ ExpMovingAverage relativeTimeSpentCompressing = ExpMovingAverage.decayBy10();
+
+ long lastCompressionStartTime;
+ long lastCompressionDuration;
+
+ /**
+ * Computes the new compression level to use for the next chunk, based on the load.
+ */
+ public int adjustAndGetCompressionLevel(long currentTime)
+ {
+ // The more write "pressure", the faster we want to go, so the lower the desired compression level.
+ double pressure = getWritePressure();
+ assert pressure >= 0.0 && pressure <= 1.0 : "pressure (" + pressure + ") out of valid range [0.0, 1.0]";
+
+ // Use minCompressionLevel when pressure = 1.0, maxCompressionLevel when pressure = 0.0
+ int pressurePoints = (int) (pressure * (params.maxCompressionLevel - params.minCompressionLevel));
+ int compressionLevelTarget = params.maxCompressionLevel - pressurePoints;
+
+ // We use wall clock time and not CPU time, because we also want to include time spent by I/O.
+ // If we're bottlenecked by writing the data to disk, this indicator should be low.
+ double relativeTimeSpentCompressing = (double) (1 + lastCompressionDuration) / (1 + currentTime - lastCompressionStartTime);
+
+ // Some smoothing is needed to avoid changing level too fast due to performance hiccups
+ this.relativeTimeSpentCompressing.update(relativeTimeSpentCompressing);
+
+ // If we're under pressure to write data fast, we need to decrease compression level.
+ // But we do that only if we're really spending significant amount of time doing compression.
+ if (compressionLevelTarget < currentCompressionLevel && this.relativeTimeSpentCompressing.get() > 0.1)
+ currentCompressionLevel--;
+ // If we're not under heavy write pressure, or we're spending very little time compressing data,
+ // we can increase the compression level and get some space savings at a low performance overhead:
+ else if (compressionLevelTarget > currentCompressionLevel || this.relativeTimeSpentCompressing.get() < 0.02)
+ currentCompressionLevel++;
+
+ currentCompressionLevel = clampCompressionLevel(currentCompressionLevel);
+ return currentCompressionLevel;
+ }
+
+ /**
+ * Must be called after compressing a chunk,
+ * so we can measure how much time we spend in compressing vs time spent not-compressing.
+ */
+ public void recordCompressionDuration(long startTime, long endTime)
+ {
+ this.lastCompressionDuration = endTime - startTime;
+ this.lastCompressionStartTime = startTime;
+ }
+
+ @VisibleForTesting
+ double getRelativeTimeSpentCompressing()
+ {
+ return this.relativeTimeSpentCompressing.get();
+ }
+ }
+
+ /**
+ * @param params user-provided configuration such as min/max compression level range
+ * @param writePressureSupplier returns a non-negative score determining the write load on the system which
+ * is used to control the desired compression level. Influences the compression
+ * level linearly: an inceease of pressure by 1 point causes the target
+ * compression level to be decreased by 1 point. Zero will select the
+ * maximum allowed compression level.
+ */
+ @VisibleForTesting
+ AdaptiveCompressor(Params params, Supplier writePressureSupplier)
+ {
+ this.params = params;
+ this.state = new ThreadLocal<>();
+ this.writePressureSupplier = writePressureSupplier;
+ }
+
+ @Override
+ public int initialCompressedBufferLength(int chunkLength)
+ {
+ return (int) Zstd.compressBound(chunkLength);
+ }
+
+
+ @Override
+ public void compress(ByteBuffer input, ByteBuffer output) throws IOException
+ {
+ try
+ {
+ State state = getThreadLocalState();
+ long startTime = System.nanoTime();
+ int compressionLevel = zstdCompressionLevels[state.adjustAndGetCompressionLevel(startTime)];
+ Zstd.compress(output, input, compressionLevel, true);
+ long endTime = System.nanoTime();
+ state.recordCompressionDuration(startTime, endTime);
+ metrics.get(params.use).updateFrom(state);
+ }
+ catch (Exception e)
+ {
+ throw new IOException("Compression failed", e);
+ }
+
+ }
+
+ @Override
+ public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException
+ {
+ long dsz = Zstd.decompressByteArray(output, outputOffset, output.length - outputOffset,
+ input, inputOffset, inputLength);
+
+ if (Zstd.isError(dsz))
+ throw new IOException(String.format("Decompression failed due to %s", Zstd.getErrorName(dsz)));
+
+ return (int) dsz;
+ }
+
+ @Override
+ public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+ {
+ try
+ {
+ Zstd.decompress(output, input);
+ } catch (Exception e)
+ {
+ throw new IOException("Decompression failed", e);
+ }
+ }
+
+ @Override
+ public BufferType preferredBufferType()
+ {
+ return BufferType.OFF_HEAP;
+ }
+
+ @Override
+ public Set recommendedUses()
+ {
+ return params.minCompressionLevel <= DEFAULT_MIN_FAST_COMPRESSION_LEVEL
+ ? EnumSet.of(Uses.GENERAL, Uses.FAST_COMPRESSION)
+ : EnumSet.of(params.use);
+ }
+
+ @Override
+ public ICompressor forUse(Uses use)
+ {
+ if (use == params.use)
+ return this;
+
+ switch (use)
+ {
+ case GENERAL:
+ return createForCompaction(params.minCompressionLevel, params.maxCompressionLevel, params.maxCompactionQueueLength);
+ case FAST_COMPRESSION:
+ return createForFlush(params.minCompressionLevel, params.maxCompressionLevel);
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean supports(BufferType bufferType)
+ {
+ return bufferType == BufferType.OFF_HEAP;
+ }
+
+ @Override
+ public Set supportedOptions()
+ {
+ return Set.of("max_compression_level", "min_compression_level");
+ }
+
+ private static int getMinCompressionLevel(Uses mode, Map options)
+ {
+ int defaultValue = mode == Uses.FAST_COMPRESSION ? DEFAULT_MIN_FAST_COMPRESSION_LEVEL : DEFAULT_MIN_GENERAL_COMPRESSION_LEVEL;
+ return getIntOption(options, MIN_COMPRESSION_LEVEL_OPTION_NAME, defaultValue);
+ }
+
+ private static int getMaxCompressionLevel(Uses mode, Map options)
+ {
+ var defaultValue = mode == Uses.FAST_COMPRESSION ? DEFAULT_MAX_FAST_COMPRESSION_LEVEL : DEFAULT_MAX_GENERAL_COMPRESSION_LEVEL;
+ return getIntOption(options, MAX_COMPRESSION_LEVEL_OPTION_NAME, defaultValue);
+ }
+
+ private static int getMaxCompactionQueueLength(Map options)
+ {
+ return getIntOption(options, MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME, DEFAULT_MAX_COMPACTION_QUEUE_LENGTH);
+ }
+
+ private static int getIntOption(Map options, String key, int defaultValue)
+ {
+ if (options == null)
+ return defaultValue;
+
+ String val = options.get(key);
+ if (val == null)
+ return defaultValue;
+
+ return Integer.parseInt(val);
+ }
+
+ private double getWritePressure()
+ {
+ return writePressureSupplier.get();
+ }
+
+ private static double getFlushPressure()
+ {
+ var memoryPool = AbstractAllocatorMemtable.MEMORY_POOL;
+ var usedRatio = Math.max(memoryPool.onHeap.usedRatio(), memoryPool.offHeap.usedRatio());
+ var cleanupThreshold = DatabaseDescriptor.getMemtableCleanupThreshold();
+ // we max out the pressure when we're halfway between the cleanupThreshold and max memory
+ // so we still have some memory left while compression already working at max speed;
+ // setting the compressor to maximum speed when we exhausted all memory would be too late
+ return Math.min(1.0, Math.max(0.0, 2 * (usedRatio - cleanupThreshold)) / (1.0 - cleanupThreshold));
+ }
+
+ private static double getCompactionPressure(int maxCompactionQueueLength)
+ {
+ CompactionManager compactionManager = CompactionManager.instance;
+ long rateLimit = DatabaseDescriptor.getCompactionThroughputMbPerSec() * FileUtils.ONE_MB;
+ if (rateLimit == 0)
+ rateLimit = Long.MAX_VALUE;
+ double actualRate = compactionManager.getMetrics().bytesCompactedThroughput.getOneMinuteRate();
+ // We don't want to speed up compression if we can keep up with the configured compression rate limit
+ // 0.0 if actualRate >= rateLimit
+ // 1.0 if actualRate <= 0.8 * rateLimit;
+ double rateLimitFactor = Math.min(1.0, Math.max(0.0, (rateLimit - actualRate) / (0.2 * rateLimit)));
+
+ long pendingCompactions = compactionManager.getPendingTasks();
+ long activeCompactions = compactionManager.getActiveCompactions();
+ long queuedCompactions = pendingCompactions - activeCompactions;
+ double compactionQueuePressure = Math.min(1.0, (double) queuedCompactions / (maxCompactionQueueLength * DatabaseDescriptor.getConcurrentCompactors()));
+ return compactionQueuePressure * rateLimitFactor;
+ }
+
+ private int clampCompressionLevel(long compressionLevel)
+ {
+ return (int) Math.min(params.maxCompressionLevel, Math.max(params.minCompressionLevel, compressionLevel));
+ }
+
+ @VisibleForTesting
+ State getThreadLocalState()
+ {
+ State state = this.state.get();
+ if (state == null)
+ {
+ state = new State();
+ state.currentCompressionLevel = params.maxCompressionLevel;
+ state.lastCompressionDuration = 0;
+ this.state.set(state);
+ }
+ return state;
+ }
+
+ static class Metrics
+ {
+ private final Counter[] compressionLevelHistogram; // separate counters for each compression level
+ private final Histogram relativeTimeSpentCompressing; // in % (i.e. multiplied by 100 becaue Histogram can only keep integers)
+
+ Metrics(Uses use)
+ {
+ MetricNameFactory factory = new DefaultNameFactory("AdaptiveCompression");
+
+ // cannot use Metrics.histogram for compression levels, because histograms do not handle negative numbers;
+ // also this histogram is small enough that storing all buckets is not a problem, but it gives
+ // much more information
+ compressionLevelHistogram = new Counter[MAX_COMPRESSION_LEVEL + 1];
+ for (int i = 0; i < compressionLevelHistogram.length; i++)
+ {
+ CassandraMetricsRegistry.MetricName metricName = factory.createMetricName(String.format("CompressionLevel_%s_%02d", use.name(), i));
+ compressionLevelHistogram[i] = CassandraMetricsRegistry.Metrics.counter(metricName);
+ }
+
+ relativeTimeSpentCompressing = CassandraMetricsRegistry.Metrics.histogram(factory.createMetricName("RelativeTimeSpentCompressing_" + use.name()), true);
+ }
+
+ void updateFrom(State state)
+ {
+ compressionLevelHistogram[state.currentCompressionLevel].inc();
+ relativeTimeSpentCompressing.update((int)(state.getRelativeTimeSpentCompressing() * 100.0));
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/io/compress/ICompressor.java b/src/java/org/apache/cassandra/io/compress/ICompressor.java
index fd6a104431b3..b6f8b2c50eea 100644
--- a/src/java/org/apache/cassandra/io/compress/ICompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ICompressor.java
@@ -83,4 +83,18 @@ default Set recommendedUses()
{
return ImmutableSet.copyOf(EnumSet.allOf(Uses.class));
}
+
+ /**
+ * Returns the compressor configured for a particular use.
+ * Allows creating a compressor implementation that can handle multiple uses but requires different configurations
+ * adapted to a particular use.
+ *
+ * May return this object.
+ * May not modify this object.
+ * Should return null if the request cannot be satisfied.
+ */
+ default ICompressor forUse(Uses use)
+ {
+ return recommendedUses().contains(use) ? this : null;
+ }
}
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
index a8884831a9a8..4b0bbd96dda4 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@@ -43,6 +44,7 @@
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.guardrails.Guardrails;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.AdaptiveCompressor;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.sstable.Component;
@@ -153,13 +155,21 @@ public static CompressionParams compressionFor(final OperationType opType, Table
case fast:
if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION))
{
- // The default compressor is generally fast (LZ4 with 16KiB block size)
- compressionParams = CompressionParams.DEFAULT;
+ compressionParams = CompressionParams.FAST;
+ break;
+ }
+ // else fall through
+ case adaptive:
+ if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION))
+ {
+ compressionParams = CompressionParams.FAST_ADAPTIVE;
break;
}
// else fall through
case table:
default:
+ compressionParams = Optional.ofNullable(compressionParams.forUse(ICompressor.Uses.FAST_COMPRESSION))
+ .orElse(compressionParams);
break;
}
}
diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java
index 53760a91893e..22a07a97a582 100644
--- a/src/java/org/apache/cassandra/schema/CompressionParams.java
+++ b/src/java/org/apache/cassandra/schema/CompressionParams.java
@@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import com.google.common.annotations.VisibleForTesting;
@@ -35,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -66,12 +68,26 @@ public final class CompressionParams
public static final String ENABLED = "enabled";
public static final String MIN_COMPRESS_RATIO = "min_compress_ratio";
- public static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
+ public static final CompressionParams FAST = new CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
DEFAULT_CHUNK_LENGTH,
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
DEFAULT_MIN_COMPRESS_RATIO,
Collections.emptyMap());
+ public static final CompressionParams ADAPTIVE = new CompressionParams(AdaptiveCompressor.create(Collections.emptyMap()),
+ DEFAULT_CHUNK_LENGTH,
+ calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
+ DEFAULT_MIN_COMPRESS_RATIO,
+ Collections.emptyMap());
+
+ public static final CompressionParams FAST_ADAPTIVE = new CompressionParams(AdaptiveCompressor.createForFlush(Collections.emptyMap()),
+ DEFAULT_CHUNK_LENGTH,
+ calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
+ DEFAULT_MIN_COMPRESS_RATIO,
+ Collections.emptyMap());
+
+ public static final CompressionParams DEFAULT = DatabaseDescriptor.shouldUseAdaptiveCompressionByDefault() ? ADAPTIVE : FAST;
+
public static final CompressionParams NOOP = new CompressionParams(NoopCompressor.create(Collections.emptyMap()),
// 4 KiB is often the underlying disk block size
1024 * 4,
@@ -249,6 +265,24 @@ public boolean isEnabled()
return sstableCompressor != null;
}
+ /**
+ * Specializes the compressor for given use.
+ * May cause reconfiguration of parameters on some compressors.
+ * Returns null if params are not compatible with the given use.
+ */
+ public CompressionParams forUse(ICompressor.Uses use)
+ {
+ ICompressor specializedCompressor = this.sstableCompressor.forUse(use);
+ if (specializedCompressor == null)
+ return null;
+
+ assert specializedCompressor.recommendedUses().contains(use);
+ if (specializedCompressor == sstableCompressor)
+ return this;
+
+ return new CompressionParams(specializedCompressor, chunkLength, maxCompressedLength, minCompressRatio, otherOptions);
+ }
+
/**
* Returns the SSTable compressor.
* @return the SSTable compressor or {@code null} if compression is disabled.
diff --git a/test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java b/test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java
new file mode 100644
index 000000000000..b8cf758e933d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class AdaptiveCompressorTest
+{
+
+ @Test(expected = IllegalArgumentException.class)
+ public void badCompressionLevelParamThrowsExceptionMin()
+ {
+ AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MIN_COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(AdaptiveCompressor.MIN_COMPRESSION_LEVEL - 1)));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void badCompressionLevelParamThrowsExceptionMax()
+ {
+ AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MAX_COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(AdaptiveCompressor.MAX_COMPRESSION_LEVEL + 1)));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void badMaxCompactionQueueLengthParamThrowsExceptionMin()
+ {
+ AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME, "-1"));
+ }
+
+ @Test
+ public void averageRelativeTimeCompressingIsMeasuredProperly() throws IOException, InterruptedException
+ {
+ var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 15, 15, 0);
+ AdaptiveCompressor c1 = new AdaptiveCompressor(params, () -> 0.0);
+ ByteBuffer src = getSrcByteBuffer();
+ ByteBuffer dest = getDstByteBuffer(c1);
+ for (int i = 0; i < 20000; i++)
+ {
+ compress(c1, src, dest);
+ }
+ assertTrue(c1.getThreadLocalState().getRelativeTimeSpentCompressing() > 0.8);
+ assertTrue(c1.getThreadLocalState().getRelativeTimeSpentCompressing() < 1.0);
+
+
+ var params2 = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 0, 0, 0);
+ AdaptiveCompressor c2 = new AdaptiveCompressor(params2, () -> 0.0);
+ for (int i = 0; i < 100; i++)
+ {
+ Thread.sleep(1);
+ compress(c2, src, dest);
+ }
+ assertTrue(c2.getThreadLocalState().getRelativeTimeSpentCompressing() < 0.02);
+ assertTrue(c2.getThreadLocalState().getRelativeTimeSpentCompressing() > 0.0);
+ }
+
+ @Test
+ public void compressionLevelAdaptsToWritePressure() throws IOException
+ {
+ var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 2, 8, 0);
+ double[] load = { 1.0 };
+
+ AdaptiveCompressor c = new AdaptiveCompressor(params, () -> load[0]);
+ ByteBuffer src = getSrcByteBuffer();
+ ByteBuffer dest = getDstByteBuffer(c);
+
+ for (int i = 0; i < 10; i++)
+ compress(c, src, dest);
+
+ assertEquals(2, c.getThreadLocalState().currentCompressionLevel);
+
+ // Low load; compression level must be increased back to max:
+ load[0] = 0L;
+
+ for (int i = 0; i < 10; i++)
+ compress(c, src, dest);
+
+ assertEquals(8, c.getThreadLocalState().currentCompressionLevel);
+ }
+
+ @Test
+ public void compressionLevelDoesNotDecreaseWhenCompressionIsNotABottleneck() throws IOException, InterruptedException
+ {
+ var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 2, 8, 0);
+ // Simulate high write load
+ AdaptiveCompressor c = new AdaptiveCompressor(params, () -> 1.0);
+ ByteBuffer src = getSrcByteBuffer();
+ ByteBuffer dest = getDstByteBuffer(c);
+
+ for (int i = 0; i < 200; i++)
+ {
+ Thread.sleep(1); // creates artificial bottleneck that is much slower than compression
+ compress(c, src, dest);
+ }
+
+ assertEquals(8, c.getThreadLocalState().currentCompressionLevel);
+ }
+
+ private static ByteBuffer getDstByteBuffer(ICompressor compressor)
+ {
+ return ByteBuffer.allocateDirect(compressor.initialCompressedBufferLength(RandomAccessReader.DEFAULT_BUFFER_SIZE));
+ }
+
+ private static ByteBuffer getSrcByteBuffer()
+ {
+ int n = RandomAccessReader.DEFAULT_BUFFER_SIZE;
+ byte[] srcData = new byte[n];
+ new Random().nextBytes(srcData);
+
+ ByteBuffer src = ByteBuffer.allocateDirect(n);
+ src.put(srcData, 0, n);
+ src.flip().position(0);
+ return src;
+ }
+
+ private static void compress(AdaptiveCompressor c, ByteBuffer src, ByteBuffer dest) throws IOException
+ {
+ c.compress(src, dest);
+ src.rewind();
+ dest.rewind();
+ }
+
+}
diff --git a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
index a1e09ed77895..0f117095b0e6 100644
--- a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
@@ -174,6 +174,39 @@ public void zstdFlushTest() throws Throwable
});
}
+ @Test
+ public void adaptiveFlushTest() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'AdaptiveCompressor'};");
+ DatabaseDescriptor.setFlushCompression(Config.FlushCompression.fast);
+ ColumnFamilyStore store = flushTwice();
+
+ // Should flush as LZ4
+ Set sstables = store.getLiveSSTables();
+ sstables.forEach(sstable -> {
+ assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof LZ4Compressor);
+ });
+ store.truncateBlocking();
+
+ DatabaseDescriptor.setFlushCompression(Config.FlushCompression.adaptive);
+ store = flushTwice();
+
+ // Should flush as Adaptive
+ sstables = store.getLiveSSTables();
+ sstables.forEach(sstable -> {
+ assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof AdaptiveCompressor);
+ });
+
+ // Should compact to Adaptive
+ compact();
+
+ sstables = store.getLiveSSTables();
+ assertEquals(1, sstables.size());
+ store.getLiveSSTables().forEach(sstable -> {
+ assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof AdaptiveCompressor);
+ });
+ }
+
@Test
public void deflateFlushTest() throws Throwable
{
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
index a3a64babfa7d..7cf37844892e 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
@@ -29,8 +29,10 @@
import com.google.common.io.Files;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
@@ -41,6 +43,12 @@
public class CompressorTest
{
+ @BeforeClass
+ public static void initialize()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ }
+
ICompressor compressor;
ICompressor[] compressors = new ICompressor[] {
@@ -48,6 +56,7 @@ public class CompressorTest
DeflateCompressor.create(Collections.emptyMap()),
SnappyCompressor.create(Collections.emptyMap()),
ZstdCompressor.create(Collections.emptyMap()),
+ AdaptiveCompressor.createForUnitTesting(),
NoopCompressor.create(Collections.emptyMap())
};
@@ -190,6 +199,13 @@ public void testZstdByteBuffers() throws IOException
testByteBuffers();
}
+ @Test
+ public void testAdaptiveByteBuffers() throws IOException
+ {
+ compressor = AdaptiveCompressor.createForUnitTesting();
+ testByteBuffers();
+ }
+
@Test
public void testNoopByteBuffers() throws IOException
{