diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index b6f4d4bf5b39..b66120417776 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -479,6 +479,8 @@ commitlog_segment_size_in_mb: 32 # none : Flush without compressing blocks but while still doing checksums. # fast : Flush with a fast compressor. If the table is already using a # fast compressor that compressor is used. +# adaptive : Flush with a fast adaptive compressor. If the table is already using a +# fast compressor that compressor is used. # table: Always flush with the same compressor that the table uses. This # was the pre 4.0 behavior. # diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index a091f30b3f7b..c0cad39b5059 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -20,6 +20,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -37,6 +38,7 @@ import org.apache.cassandra.fql.FullQueryLoggerOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.guardrails.GuardrailsConfig; +import org.apache.cassandra.io.compress.AdaptiveCompressor; import org.apache.cassandra.utils.FBUtilities; /** @@ -290,8 +292,9 @@ public class Config public double commitlog_sync_group_window_in_ms = Double.NaN; public int commitlog_sync_period_in_ms; public int commitlog_segment_size_in_mb = 32; + public ParameterizedClass commitlog_compression; - public FlushCompression flush_compression = FlushCompression.fast; + public FlushCompression flush_compression; public int commitlog_max_compression_buffers_in_pool = 3; public Integer periodic_commitlog_sync_lag_block_in_ms; public TransparentDataEncryptionOptions transparent_data_encryption_options = new TransparentDataEncryptionOptions(); @@ -662,6 +665,7 @@ public enum FlushCompression { none, fast, + adaptive, table } diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index ff76681b53a7..55f52464b62d 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2081,7 +2081,9 @@ public static void setCommitLogCompression(ParameterizedClass compressor) public static Config.FlushCompression getFlushCompression() { - return conf.flush_compression; + return Objects.requireNonNullElseGet(conf.flush_compression, () -> shouldUseAdaptiveCompressionByDefault() + ? Config.FlushCompression.adaptive + : Config.FlushCompression.fast); } public static void setFlushCompression(Config.FlushCompression compression) @@ -2089,7 +2091,12 @@ public static void setFlushCompression(Config.FlushCompression compression) conf.flush_compression = compression; } - /** + public static boolean shouldUseAdaptiveCompressionByDefault() + { + return System.getProperty("cassandra.default_sstable_compression", "fast").equals("adaptive"); + } + + /** * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use * more, depending on how soon the sync policy stops all writing threads. diff --git a/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java b/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java new file mode 100644 index 000000000000..e83a2c859209 --- /dev/null +++ b/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; +import com.github.luben.zstd.Zstd; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.memtable.AbstractAllocatorMemtable; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.metrics.CassandraMetricsRegistry; +import org.apache.cassandra.metrics.DefaultNameFactory; +import org.apache.cassandra.metrics.MetricNameFactory; +import org.apache.cassandra.utils.ExpMovingAverage; + +/** + * A compressor that dynamically adapts the compression level to the load. + * If the system is not heavily loaded by writes, data are compressed using high compression level. + * If the number of compactions or flushes queue up, it decreases the compression level to speed up + * flushing / compacting. + *

+ * Underneath, the ZStandard compressor is used. The compression level can be changed between the frames, + * even when compressing the same sstable file. ZStandard was chosen because at the fast end it + * can reach compression speed of LZ4, but at moderate compression levels it usually offers much better + * compression ratio (typically files are smaller by 20-40%) than LZ4 without compromising compression speed by too much. + *

+ * This compressor can be used for either of Uses: FAST_COMPRESSION and GENERAL. + * Each use can have different minimum and maximum compression level limits. + * For FAST_COMPRESSION, the number of pending flushes is used as the indicator of write load. + * For GENERAL compression, the number of pending compactions is used as the indicator of write load. + *

+ * Valid compression levels are in range 0..15 (inclusive), where 0 means fastest compression and 15 means slowest/best. + * Usually levels around 7-11 strike the best balance between performance and compresion ratio. + * Going above level 12 usually only results in slower compression but not much compression ratio improvement. + *

+ * Caution: This compressor decompresses about 2x-4x slower than LZ4Compressor, regardless of the compression level. + * Therefore, it may negatively affect read speed from very read-heavy tables, especially when the chunk-cache + * hit ratio is low. In synthetic tests with chunk cache disabled, read throughput turned out to be up to 10% + * lower than when using LZ4 on some workloads. + */ +public class AdaptiveCompressor implements ICompressor +{ + @VisibleForTesting + static final Map metrics = new EnumMap<>(Map.of( + Uses.FAST_COMPRESSION, new Metrics(Uses.FAST_COMPRESSION), + Uses.GENERAL, new Metrics(Uses.GENERAL) + )); + + protected static final String MIN_COMPRESSION_LEVEL_OPTION_NAME = "min_compression_level"; + protected static final String MAX_COMPRESSION_LEVEL_OPTION_NAME = "max_compression_level"; + protected static final String MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME = "max_compaction_queue_length"; + + /** + * Maps AdaptiveCompressor compression level to underlying ZStandard compression levels. + * This mapping is needed because ZStandard levels are not continuous, zstd level 0 is special and means level 3. + * Hence, we just use our own continuous scale starting at 0. + */ + private static final int[] zstdCompressionLevels = { + -7, // 0 (very fast but compresses poorly) + -6, // 1 + -5, // 2 (LZ4 level is somewhere here) + -4, // 3 + -3, // 4 + -2, // 5 + -1, // 6 + 1, // 7 (sweet spot area usually here) + 2, // 8 (sweet spot area usually here) + 3, // 9 (sweet spot area usually here, ~50% slower than LZ4) + 4, // 10 (sweet spot area usually here) + 5, // 11 (sweet spot area usually here) + 6, // 12 + 7, // 13 + 8, // 14 + 9, // 15 (very slow, usually over 10x slower than LZ4) + }; + + public static final int MIN_COMPRESSION_LEVEL = 0; + public static final int MAX_COMPRESSION_LEVEL = 15; + + public static final int DEFAULT_MIN_FAST_COMPRESSION_LEVEL = 2; // zstd level -5 + public static final int DEFAULT_MAX_FAST_COMPRESSION_LEVEL = 9; // zstd level 5 + public static final int DEFAULT_MIN_GENERAL_COMPRESSION_LEVEL = 7; // zstd level 1 + public static final int DEFAULT_MAX_GENERAL_COMPRESSION_LEVEL = 12; // zstd level 6 + public static final int DEFAULT_MAX_COMPACTION_QUEUE_LENGTH = 16; + + private static final ConcurrentHashMap instances = new ConcurrentHashMap<>(); + + public static AdaptiveCompressor create(Map options) + { + int minCompressionLevel = getMinCompressionLevel(Uses.GENERAL, options); + int maxCompressionLevel = getMaxCompressionLevel(Uses.GENERAL, options); + int maxCompactionQueueLength = getMaxCompactionQueueLength(options); + return createForCompaction(minCompressionLevel, maxCompressionLevel, maxCompactionQueueLength); + } + + private static AdaptiveCompressor createForCompaction(int minCompressionLevel, int maxCompressionLevel, int maxCompactionQueueLength) + { + Params params = new Params(Uses.GENERAL, minCompressionLevel, maxCompressionLevel, maxCompactionQueueLength); + Supplier compactionPressureSupplier = () -> getCompactionPressure(maxCompactionQueueLength); + return instances.computeIfAbsent(params, p -> new AdaptiveCompressor(p, compactionPressureSupplier)); + } + + /** + * Creates a compressor that doesn't refer to any other C* components like compaction manager or memory pools. + */ + @VisibleForTesting + public static ICompressor createForUnitTesting() + { + Params params = new Params(Uses.GENERAL, 9, 9, 0); + return new AdaptiveCompressor(params, () -> 0.0); + } + + public static AdaptiveCompressor createForFlush(Map options) + { + int minCompressionLevel = getMinCompressionLevel(Uses.FAST_COMPRESSION, options); + int maxCompressionLevel = getMaxCompressionLevel(Uses.FAST_COMPRESSION, options); + return createForFlush(minCompressionLevel, maxCompressionLevel); + } + + private static AdaptiveCompressor createForFlush(int minCompressionLevel, int maxCompressionLevel) + { + Params params = new Params(Uses.FAST_COMPRESSION, minCompressionLevel, maxCompressionLevel, 0); + return instances.computeIfAbsent(params, p -> new AdaptiveCompressor(p, AdaptiveCompressor::getFlushPressure)); + } + + private final Params params; + private final ThreadLocal state; + private final Supplier writePressureSupplier; + + + static class Params + { + final Uses use; + final int minCompressionLevel; + final int maxCompressionLevel; + final int maxCompactionQueueLength; + + Params(Uses use, int minCompressionLevel, int maxCompressionLevel, int maxCompactionQueueLength) + { + if (minCompressionLevel < MIN_COMPRESSION_LEVEL || minCompressionLevel > MAX_COMPRESSION_LEVEL) + throw new IllegalArgumentException("Min compression level " + minCompressionLevel + "out of range" + + " [" + MIN_COMPRESSION_LEVEL + ", " + MAX_COMPRESSION_LEVEL + ']'); + if (maxCompressionLevel < MIN_COMPRESSION_LEVEL || maxCompressionLevel > MAX_COMPRESSION_LEVEL) + throw new IllegalArgumentException("Max compression level " + maxCompressionLevel + "out of range" + + " [" + MIN_COMPRESSION_LEVEL + ", " + MAX_COMPRESSION_LEVEL + ']'); + if (maxCompactionQueueLength < 0) + throw new IllegalArgumentException("Negative max compaction queue length: " + maxCompactionQueueLength); + + this.use = use; + this.minCompressionLevel = minCompressionLevel; + this.maxCompressionLevel = maxCompressionLevel; + this.maxCompactionQueueLength = maxCompactionQueueLength; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + Params params = (Params) o; + return minCompressionLevel == params.minCompressionLevel && maxCompressionLevel == params.maxCompressionLevel && use == params.use; + } + + @Override + public int hashCode() + { + return Objects.hash(use, minCompressionLevel, maxCompressionLevel); + } + } + + /** + * Keeps thread local state. + * We need this because we want to not only monitor pending flushes/compactions but also how much + * time we spend in compression relative to time spent by the thread in non-compression tasks like preparation + * of data or writing. Because ICompressor can be shared by multiple threads, we need to keep + * track of each thread separately. + */ + class State + { + /** + * ZStandard compression level that was used when compressing the previous chunk. + * Can be adjusted up or down by at most 1 with every next block. + */ + int currentCompressionLevel; + + /** + * How much time is spent by the thread in compression relative to the time spent in non-compression code. + * Valid range is [0.0, 1.0]. + * 1.0 means we're doing only compression and nothing else. + * 0.0 means we're not spending any time doing compression. + * This indicator allows us to detect whether we're bottlenecked by something else than compression, + * e.g. by disk I/O or by preparation of data to compress (e.g. iterating the memtable trie). + * If this value is low, then there is not much gain in decreasing the compression + * level. + */ + ExpMovingAverage relativeTimeSpentCompressing = ExpMovingAverage.decayBy10(); + + long lastCompressionStartTime; + long lastCompressionDuration; + + /** + * Computes the new compression level to use for the next chunk, based on the load. + */ + public int adjustAndGetCompressionLevel(long currentTime) + { + // The more write "pressure", the faster we want to go, so the lower the desired compression level. + double pressure = getWritePressure(); + assert pressure >= 0.0 && pressure <= 1.0 : "pressure (" + pressure + ") out of valid range [0.0, 1.0]"; + + // Use minCompressionLevel when pressure = 1.0, maxCompressionLevel when pressure = 0.0 + int pressurePoints = (int) (pressure * (params.maxCompressionLevel - params.minCompressionLevel)); + int compressionLevelTarget = params.maxCompressionLevel - pressurePoints; + + // We use wall clock time and not CPU time, because we also want to include time spent by I/O. + // If we're bottlenecked by writing the data to disk, this indicator should be low. + double relativeTimeSpentCompressing = (double) (1 + lastCompressionDuration) / (1 + currentTime - lastCompressionStartTime); + + // Some smoothing is needed to avoid changing level too fast due to performance hiccups + this.relativeTimeSpentCompressing.update(relativeTimeSpentCompressing); + + // If we're under pressure to write data fast, we need to decrease compression level. + // But we do that only if we're really spending significant amount of time doing compression. + if (compressionLevelTarget < currentCompressionLevel && this.relativeTimeSpentCompressing.get() > 0.1) + currentCompressionLevel--; + // If we're not under heavy write pressure, or we're spending very little time compressing data, + // we can increase the compression level and get some space savings at a low performance overhead: + else if (compressionLevelTarget > currentCompressionLevel || this.relativeTimeSpentCompressing.get() < 0.02) + currentCompressionLevel++; + + currentCompressionLevel = clampCompressionLevel(currentCompressionLevel); + return currentCompressionLevel; + } + + /** + * Must be called after compressing a chunk, + * so we can measure how much time we spend in compressing vs time spent not-compressing. + */ + public void recordCompressionDuration(long startTime, long endTime) + { + this.lastCompressionDuration = endTime - startTime; + this.lastCompressionStartTime = startTime; + } + + @VisibleForTesting + double getRelativeTimeSpentCompressing() + { + return this.relativeTimeSpentCompressing.get(); + } + } + + /** + * @param params user-provided configuration such as min/max compression level range + * @param writePressureSupplier returns a non-negative score determining the write load on the system which + * is used to control the desired compression level. Influences the compression + * level linearly: an inceease of pressure by 1 point causes the target + * compression level to be decreased by 1 point. Zero will select the + * maximum allowed compression level. + */ + @VisibleForTesting + AdaptiveCompressor(Params params, Supplier writePressureSupplier) + { + this.params = params; + this.state = new ThreadLocal<>(); + this.writePressureSupplier = writePressureSupplier; + } + + @Override + public int initialCompressedBufferLength(int chunkLength) + { + return (int) Zstd.compressBound(chunkLength); + } + + + @Override + public void compress(ByteBuffer input, ByteBuffer output) throws IOException + { + try + { + State state = getThreadLocalState(); + long startTime = System.nanoTime(); + int compressionLevel = zstdCompressionLevels[state.adjustAndGetCompressionLevel(startTime)]; + Zstd.compress(output, input, compressionLevel, true); + long endTime = System.nanoTime(); + state.recordCompressionDuration(startTime, endTime); + metrics.get(params.use).updateFrom(state); + } + catch (Exception e) + { + throw new IOException("Compression failed", e); + } + + } + + @Override + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException + { + long dsz = Zstd.decompressByteArray(output, outputOffset, output.length - outputOffset, + input, inputOffset, inputLength); + + if (Zstd.isError(dsz)) + throw new IOException(String.format("Decompression failed due to %s", Zstd.getErrorName(dsz))); + + return (int) dsz; + } + + @Override + public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException + { + try + { + Zstd.decompress(output, input); + } catch (Exception e) + { + throw new IOException("Decompression failed", e); + } + } + + @Override + public BufferType preferredBufferType() + { + return BufferType.OFF_HEAP; + } + + @Override + public Set recommendedUses() + { + return params.minCompressionLevel <= DEFAULT_MIN_FAST_COMPRESSION_LEVEL + ? EnumSet.of(Uses.GENERAL, Uses.FAST_COMPRESSION) + : EnumSet.of(params.use); + } + + @Override + public ICompressor forUse(Uses use) + { + if (use == params.use) + return this; + + switch (use) + { + case GENERAL: + return createForCompaction(params.minCompressionLevel, params.maxCompressionLevel, params.maxCompactionQueueLength); + case FAST_COMPRESSION: + return createForFlush(params.minCompressionLevel, params.maxCompressionLevel); + } + + return null; + } + + @Override + public boolean supports(BufferType bufferType) + { + return bufferType == BufferType.OFF_HEAP; + } + + @Override + public Set supportedOptions() + { + return Set.of("max_compression_level", "min_compression_level"); + } + + private static int getMinCompressionLevel(Uses mode, Map options) + { + int defaultValue = mode == Uses.FAST_COMPRESSION ? DEFAULT_MIN_FAST_COMPRESSION_LEVEL : DEFAULT_MIN_GENERAL_COMPRESSION_LEVEL; + return getIntOption(options, MIN_COMPRESSION_LEVEL_OPTION_NAME, defaultValue); + } + + private static int getMaxCompressionLevel(Uses mode, Map options) + { + var defaultValue = mode == Uses.FAST_COMPRESSION ? DEFAULT_MAX_FAST_COMPRESSION_LEVEL : DEFAULT_MAX_GENERAL_COMPRESSION_LEVEL; + return getIntOption(options, MAX_COMPRESSION_LEVEL_OPTION_NAME, defaultValue); + } + + private static int getMaxCompactionQueueLength(Map options) + { + return getIntOption(options, MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME, DEFAULT_MAX_COMPACTION_QUEUE_LENGTH); + } + + private static int getIntOption(Map options, String key, int defaultValue) + { + if (options == null) + return defaultValue; + + String val = options.get(key); + if (val == null) + return defaultValue; + + return Integer.parseInt(val); + } + + private double getWritePressure() + { + return writePressureSupplier.get(); + } + + private static double getFlushPressure() + { + var memoryPool = AbstractAllocatorMemtable.MEMORY_POOL; + var usedRatio = Math.max(memoryPool.onHeap.usedRatio(), memoryPool.offHeap.usedRatio()); + var cleanupThreshold = DatabaseDescriptor.getMemtableCleanupThreshold(); + // we max out the pressure when we're halfway between the cleanupThreshold and max memory + // so we still have some memory left while compression already working at max speed; + // setting the compressor to maximum speed when we exhausted all memory would be too late + return Math.min(1.0, Math.max(0.0, 2 * (usedRatio - cleanupThreshold)) / (1.0 - cleanupThreshold)); + } + + private static double getCompactionPressure(int maxCompactionQueueLength) + { + CompactionManager compactionManager = CompactionManager.instance; + long rateLimit = DatabaseDescriptor.getCompactionThroughputMbPerSec() * FileUtils.ONE_MB; + if (rateLimit == 0) + rateLimit = Long.MAX_VALUE; + double actualRate = compactionManager.getMetrics().bytesCompactedThroughput.getOneMinuteRate(); + // We don't want to speed up compression if we can keep up with the configured compression rate limit + // 0.0 if actualRate >= rateLimit + // 1.0 if actualRate <= 0.8 * rateLimit; + double rateLimitFactor = Math.min(1.0, Math.max(0.0, (rateLimit - actualRate) / (0.2 * rateLimit))); + + long pendingCompactions = compactionManager.getPendingTasks(); + long activeCompactions = compactionManager.getActiveCompactions(); + long queuedCompactions = pendingCompactions - activeCompactions; + double compactionQueuePressure = Math.min(1.0, (double) queuedCompactions / (maxCompactionQueueLength * DatabaseDescriptor.getConcurrentCompactors())); + return compactionQueuePressure * rateLimitFactor; + } + + private int clampCompressionLevel(long compressionLevel) + { + return (int) Math.min(params.maxCompressionLevel, Math.max(params.minCompressionLevel, compressionLevel)); + } + + @VisibleForTesting + State getThreadLocalState() + { + State state = this.state.get(); + if (state == null) + { + state = new State(); + state.currentCompressionLevel = params.maxCompressionLevel; + state.lastCompressionDuration = 0; + this.state.set(state); + } + return state; + } + + static class Metrics + { + private final Counter[] compressionLevelHistogram; // separate counters for each compression level + private final Histogram relativeTimeSpentCompressing; // in % (i.e. multiplied by 100 becaue Histogram can only keep integers) + + Metrics(Uses use) + { + MetricNameFactory factory = new DefaultNameFactory("AdaptiveCompression"); + + // cannot use Metrics.histogram for compression levels, because histograms do not handle negative numbers; + // also this histogram is small enough that storing all buckets is not a problem, but it gives + // much more information + compressionLevelHistogram = new Counter[MAX_COMPRESSION_LEVEL + 1]; + for (int i = 0; i < compressionLevelHistogram.length; i++) + { + CassandraMetricsRegistry.MetricName metricName = factory.createMetricName(String.format("CompressionLevel_%s_%02d", use.name(), i)); + compressionLevelHistogram[i] = CassandraMetricsRegistry.Metrics.counter(metricName); + } + + relativeTimeSpentCompressing = CassandraMetricsRegistry.Metrics.histogram(factory.createMetricName("RelativeTimeSpentCompressing_" + use.name()), true); + } + + void updateFrom(State state) + { + compressionLevelHistogram[state.currentCompressionLevel].inc(); + relativeTimeSpentCompressing.update((int)(state.getRelativeTimeSpentCompressing() * 100.0)); + } + } +} diff --git a/src/java/org/apache/cassandra/io/compress/ICompressor.java b/src/java/org/apache/cassandra/io/compress/ICompressor.java index fd6a104431b3..b6f8b2c50eea 100644 --- a/src/java/org/apache/cassandra/io/compress/ICompressor.java +++ b/src/java/org/apache/cassandra/io/compress/ICompressor.java @@ -83,4 +83,18 @@ default Set recommendedUses() { return ImmutableSet.copyOf(EnumSet.allOf(Uses.class)); } + + /** + * Returns the compressor configured for a particular use. + * Allows creating a compressor implementation that can handle multiple uses but requires different configurations + * adapted to a particular use. + *

+ * May return this object. + * May not modify this object. + * Should return null if the request cannot be satisfied. + */ + default ICompressor forUse(Uses use) + { + return recommendedUses().contains(use) ? this : null; + } } diff --git a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java index a0d2cf290d76..8ccad31a624a 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -43,6 +44,7 @@ import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.guardrails.Guardrails; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.AdaptiveCompressor; import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.sstable.Component; @@ -153,13 +155,21 @@ public static CompressionParams compressionFor(final OperationType opType, Table case fast: if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION)) { - // The default compressor is generally fast (LZ4 with 16KiB block size) - compressionParams = CompressionParams.DEFAULT; + compressionParams = CompressionParams.FAST; + break; + } + // else fall through + case adaptive: + if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION)) + { + compressionParams = CompressionParams.FAST_ADAPTIVE; break; } // else fall through case table: default: + compressionParams = Optional.ofNullable(compressionParams.forUse(ICompressor.Uses.FAST_COMPRESSION)) + .orElse(compressionParams); break; } } diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java index 53760a91893e..22a07a97a582 100644 --- a/src/java/org/apache/cassandra/schema/CompressionParams.java +++ b/src/java/org/apache/cassandra/schema/CompressionParams.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ThreadLocalRandom; import com.google.common.annotations.VisibleForTesting; @@ -35,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ConfigurationException; @@ -66,12 +68,26 @@ public final class CompressionParams public static final String ENABLED = "enabled"; public static final String MIN_COMPRESS_RATIO = "min_compress_ratio"; - public static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.create(Collections.emptyMap()), + public static final CompressionParams FAST = new CompressionParams(LZ4Compressor.create(Collections.emptyMap()), DEFAULT_CHUNK_LENGTH, calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO), DEFAULT_MIN_COMPRESS_RATIO, Collections.emptyMap()); + public static final CompressionParams ADAPTIVE = new CompressionParams(AdaptiveCompressor.create(Collections.emptyMap()), + DEFAULT_CHUNK_LENGTH, + calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO), + DEFAULT_MIN_COMPRESS_RATIO, + Collections.emptyMap()); + + public static final CompressionParams FAST_ADAPTIVE = new CompressionParams(AdaptiveCompressor.createForFlush(Collections.emptyMap()), + DEFAULT_CHUNK_LENGTH, + calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO), + DEFAULT_MIN_COMPRESS_RATIO, + Collections.emptyMap()); + + public static final CompressionParams DEFAULT = DatabaseDescriptor.shouldUseAdaptiveCompressionByDefault() ? ADAPTIVE : FAST; + public static final CompressionParams NOOP = new CompressionParams(NoopCompressor.create(Collections.emptyMap()), // 4 KiB is often the underlying disk block size 1024 * 4, @@ -249,6 +265,24 @@ public boolean isEnabled() return sstableCompressor != null; } + /** + * Specializes the compressor for given use. + * May cause reconfiguration of parameters on some compressors. + * Returns null if params are not compatible with the given use. + */ + public CompressionParams forUse(ICompressor.Uses use) + { + ICompressor specializedCompressor = this.sstableCompressor.forUse(use); + if (specializedCompressor == null) + return null; + + assert specializedCompressor.recommendedUses().contains(use); + if (specializedCompressor == sstableCompressor) + return this; + + return new CompressionParams(specializedCompressor, chunkLength, maxCompressedLength, minCompressRatio, otherOptions); + } + /** * Returns the SSTable compressor. * @return the SSTable compressor or {@code null} if compression is disabled. diff --git a/test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java b/test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java new file mode 100644 index 000000000000..b8cf758e933d --- /dev/null +++ b/test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; + +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +import org.apache.cassandra.io.util.RandomAccessReader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class AdaptiveCompressorTest +{ + + @Test(expected = IllegalArgumentException.class) + public void badCompressionLevelParamThrowsExceptionMin() + { + AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MIN_COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(AdaptiveCompressor.MIN_COMPRESSION_LEVEL - 1))); + } + + @Test(expected = IllegalArgumentException.class) + public void badCompressionLevelParamThrowsExceptionMax() + { + AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MAX_COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(AdaptiveCompressor.MAX_COMPRESSION_LEVEL + 1))); + } + + @Test(expected = IllegalArgumentException.class) + public void badMaxCompactionQueueLengthParamThrowsExceptionMin() + { + AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME, "-1")); + } + + @Test + public void averageRelativeTimeCompressingIsMeasuredProperly() throws IOException, InterruptedException + { + var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 15, 15, 0); + AdaptiveCompressor c1 = new AdaptiveCompressor(params, () -> 0.0); + ByteBuffer src = getSrcByteBuffer(); + ByteBuffer dest = getDstByteBuffer(c1); + for (int i = 0; i < 20000; i++) + { + compress(c1, src, dest); + } + assertTrue(c1.getThreadLocalState().getRelativeTimeSpentCompressing() > 0.8); + assertTrue(c1.getThreadLocalState().getRelativeTimeSpentCompressing() < 1.0); + + + var params2 = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 0, 0, 0); + AdaptiveCompressor c2 = new AdaptiveCompressor(params2, () -> 0.0); + for (int i = 0; i < 100; i++) + { + Thread.sleep(1); + compress(c2, src, dest); + } + assertTrue(c2.getThreadLocalState().getRelativeTimeSpentCompressing() < 0.02); + assertTrue(c2.getThreadLocalState().getRelativeTimeSpentCompressing() > 0.0); + } + + @Test + public void compressionLevelAdaptsToWritePressure() throws IOException + { + var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 2, 8, 0); + double[] load = { 1.0 }; + + AdaptiveCompressor c = new AdaptiveCompressor(params, () -> load[0]); + ByteBuffer src = getSrcByteBuffer(); + ByteBuffer dest = getDstByteBuffer(c); + + for (int i = 0; i < 10; i++) + compress(c, src, dest); + + assertEquals(2, c.getThreadLocalState().currentCompressionLevel); + + // Low load; compression level must be increased back to max: + load[0] = 0L; + + for (int i = 0; i < 10; i++) + compress(c, src, dest); + + assertEquals(8, c.getThreadLocalState().currentCompressionLevel); + } + + @Test + public void compressionLevelDoesNotDecreaseWhenCompressionIsNotABottleneck() throws IOException, InterruptedException + { + var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 2, 8, 0); + // Simulate high write load + AdaptiveCompressor c = new AdaptiveCompressor(params, () -> 1.0); + ByteBuffer src = getSrcByteBuffer(); + ByteBuffer dest = getDstByteBuffer(c); + + for (int i = 0; i < 200; i++) + { + Thread.sleep(1); // creates artificial bottleneck that is much slower than compression + compress(c, src, dest); + } + + assertEquals(8, c.getThreadLocalState().currentCompressionLevel); + } + + private static ByteBuffer getDstByteBuffer(ICompressor compressor) + { + return ByteBuffer.allocateDirect(compressor.initialCompressedBufferLength(RandomAccessReader.DEFAULT_BUFFER_SIZE)); + } + + private static ByteBuffer getSrcByteBuffer() + { + int n = RandomAccessReader.DEFAULT_BUFFER_SIZE; + byte[] srcData = new byte[n]; + new Random().nextBytes(srcData); + + ByteBuffer src = ByteBuffer.allocateDirect(n); + src.put(srcData, 0, n); + src.flip().position(0); + return src; + } + + private static void compress(AdaptiveCompressor c, ByteBuffer src, ByteBuffer dest) throws IOException + { + c.compress(src, dest); + src.rewind(); + dest.rewind(); + } + +} diff --git a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java index a1e09ed77895..0f117095b0e6 100644 --- a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java @@ -174,6 +174,39 @@ public void zstdFlushTest() throws Throwable }); } + @Test + public void adaptiveFlushTest() throws Throwable + { + createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'AdaptiveCompressor'};"); + DatabaseDescriptor.setFlushCompression(Config.FlushCompression.fast); + ColumnFamilyStore store = flushTwice(); + + // Should flush as LZ4 + Set sstables = store.getLiveSSTables(); + sstables.forEach(sstable -> { + assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof LZ4Compressor); + }); + store.truncateBlocking(); + + DatabaseDescriptor.setFlushCompression(Config.FlushCompression.adaptive); + store = flushTwice(); + + // Should flush as Adaptive + sstables = store.getLiveSSTables(); + sstables.forEach(sstable -> { + assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof AdaptiveCompressor); + }); + + // Should compact to Adaptive + compact(); + + sstables = store.getLiveSSTables(); + assertEquals(1, sstables.size()); + store.getLiveSSTables().forEach(sstable -> { + assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof AdaptiveCompressor); + }); + } + @Test public void deflateFlushTest() throws Throwable { diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java index a3a64babfa7d..7cf37844892e 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java @@ -29,8 +29,10 @@ import com.google.common.io.Files; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; @@ -41,6 +43,12 @@ public class CompressorTest { + @BeforeClass + public static void initialize() + { + DatabaseDescriptor.daemonInitialization(); + } + ICompressor compressor; ICompressor[] compressors = new ICompressor[] { @@ -48,6 +56,7 @@ public class CompressorTest DeflateCompressor.create(Collections.emptyMap()), SnappyCompressor.create(Collections.emptyMap()), ZstdCompressor.create(Collections.emptyMap()), + AdaptiveCompressor.createForUnitTesting(), NoopCompressor.create(Collections.emptyMap()) }; @@ -190,6 +199,13 @@ public void testZstdByteBuffers() throws IOException testByteBuffers(); } + @Test + public void testAdaptiveByteBuffers() throws IOException + { + compressor = AdaptiveCompressor.createForUnitTesting(); + testByteBuffers(); + } + @Test public void testNoopByteBuffers() throws IOException {