From 9efb5faf9f4926f7e91fd73ee36595316e80d621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Mon, 18 Nov 2024 12:47:56 +0100 Subject: [PATCH] CNDB-11532: Adaptive compression This commit introduces a new AdaptiveCompressor class. AdaptiveCompressor uses ZStandard compression with a dynamic compression level based on the current write load. AdaptiveCompressor's goal is to provide similar write performance as LZ4Compressor for write heavy workloads, but a significantly better compression ratio for databases with a moderate amount of writes or on systems with a lot of spare CPU power. If the memtable flush queue builds up, and it turns out the compression is a significant bottleneck, then the compression level used for flushing is decreased to gain speed. Similarly, when pending compaction tasks build up, then the compression level used for compaction is decreased. In order to enable adaptive compression: - set `-Dcassandra.default_sstable_compression=adaptive` JVM option to automatically select `AdaptiveCompressor` as the main compressor for flushes and new tables, if not overriden by specific options in cassandra.yaml or table schema - set `flush_compression: adaptive` in cassandra.yaml to enable it for flushing - set `AdaptiveCompressor` in Table options to enable it for compaction Caution: this feature is not turned on by default because it may impact read speed negatively in some rare cases. Fixes https://github.com/riptano/cndb/issues/11532 --- conf/cassandra.yaml | 2 + .../org/apache/cassandra/config/Config.java | 6 +- .../cassandra/config/DatabaseDescriptor.java | 11 +- .../io/compress/AdaptiveCompressor.java | 500 ++++++++++++++++++ .../cassandra/io/compress/ICompressor.java | 14 + .../io/sstable/format/SortedTableWriter.java | 14 +- .../cassandra/schema/CompressionParams.java | 36 +- .../io/compress/AdaptiveCompressorTest.java | 147 +++++ .../io/compress/CQLCompressionTest.java | 33 ++ .../cassandra/io/compress/CompressorTest.java | 16 + 10 files changed, 773 insertions(+), 6 deletions(-) create mode 100644 src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java create mode 100644 test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index b6f4d4bf5b39..b66120417776 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -479,6 +479,8 @@ commitlog_segment_size_in_mb: 32 # none : Flush without compressing blocks but while still doing checksums. # fast : Flush with a fast compressor. If the table is already using a # fast compressor that compressor is used. +# adaptive : Flush with a fast adaptive compressor. If the table is already using a +# fast compressor that compressor is used. # table: Always flush with the same compressor that the table uses. This # was the pre 4.0 behavior. # diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index a091f30b3f7b..c0cad39b5059 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -20,6 +20,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -37,6 +38,7 @@ import org.apache.cassandra.fql.FullQueryLoggerOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.guardrails.GuardrailsConfig; +import org.apache.cassandra.io.compress.AdaptiveCompressor; import org.apache.cassandra.utils.FBUtilities; /** @@ -290,8 +292,9 @@ public class Config public double commitlog_sync_group_window_in_ms = Double.NaN; public int commitlog_sync_period_in_ms; public int commitlog_segment_size_in_mb = 32; + public ParameterizedClass commitlog_compression; - public FlushCompression flush_compression = FlushCompression.fast; + public FlushCompression flush_compression; public int commitlog_max_compression_buffers_in_pool = 3; public Integer periodic_commitlog_sync_lag_block_in_ms; public TransparentDataEncryptionOptions transparent_data_encryption_options = new TransparentDataEncryptionOptions(); @@ -662,6 +665,7 @@ public enum FlushCompression { none, fast, + adaptive, table } diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index ff76681b53a7..55f52464b62d 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2081,7 +2081,9 @@ public static void setCommitLogCompression(ParameterizedClass compressor) public static Config.FlushCompression getFlushCompression() { - return conf.flush_compression; + return Objects.requireNonNullElseGet(conf.flush_compression, () -> shouldUseAdaptiveCompressionByDefault() + ? Config.FlushCompression.adaptive + : Config.FlushCompression.fast); } public static void setFlushCompression(Config.FlushCompression compression) @@ -2089,7 +2091,12 @@ public static void setFlushCompression(Config.FlushCompression compression) conf.flush_compression = compression; } - /** + public static boolean shouldUseAdaptiveCompressionByDefault() + { + return System.getProperty("cassandra.default_sstable_compression", "fast").equals("adaptive"); + } + + /** * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use * more, depending on how soon the sync policy stops all writing threads. diff --git a/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java b/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java new file mode 100644 index 000000000000..e83a2c859209 --- /dev/null +++ b/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; +import com.github.luben.zstd.Zstd; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.memtable.AbstractAllocatorMemtable; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.metrics.CassandraMetricsRegistry; +import org.apache.cassandra.metrics.DefaultNameFactory; +import org.apache.cassandra.metrics.MetricNameFactory; +import org.apache.cassandra.utils.ExpMovingAverage; + +/** + * A compressor that dynamically adapts the compression level to the load. + * If the system is not heavily loaded by writes, data are compressed using high compression level. + * If the number of compactions or flushes queue up, it decreases the compression level to speed up + * flushing / compacting. + *

+ * Underneath, the ZStandard compressor is used. The compression level can be changed between the frames, + * even when compressing the same sstable file. ZStandard was chosen because at the fast end it + * can reach compression speed of LZ4, but at moderate compression levels it usually offers much better + * compression ratio (typically files are smaller by 20-40%) than LZ4 without compromising compression speed by too much. + *

+ * This compressor can be used for either of Uses: FAST_COMPRESSION and GENERAL. + * Each use can have different minimum and maximum compression level limits. + * For FAST_COMPRESSION, the number of pending flushes is used as the indicator of write load. + * For GENERAL compression, the number of pending compactions is used as the indicator of write load. + *

+ * Valid compression levels are in range 0..15 (inclusive), where 0 means fastest compression and 15 means slowest/best. + * Usually levels around 7-11 strike the best balance between performance and compresion ratio. + * Going above level 12 usually only results in slower compression but not much compression ratio improvement. + *

+ * Caution: This compressor decompresses about 2x-4x slower than LZ4Compressor, regardless of the compression level. + * Therefore, it may negatively affect read speed from very read-heavy tables, especially when the chunk-cache + * hit ratio is low. In synthetic tests with chunk cache disabled, read throughput turned out to be up to 10% + * lower than when using LZ4 on some workloads. + */ +public class AdaptiveCompressor implements ICompressor +{ + @VisibleForTesting + static final Map metrics = new EnumMap<>(Map.of( + Uses.FAST_COMPRESSION, new Metrics(Uses.FAST_COMPRESSION), + Uses.GENERAL, new Metrics(Uses.GENERAL) + )); + + protected static final String MIN_COMPRESSION_LEVEL_OPTION_NAME = "min_compression_level"; + protected static final String MAX_COMPRESSION_LEVEL_OPTION_NAME = "max_compression_level"; + protected static final String MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME = "max_compaction_queue_length"; + + /** + * Maps AdaptiveCompressor compression level to underlying ZStandard compression levels. + * This mapping is needed because ZStandard levels are not continuous, zstd level 0 is special and means level 3. + * Hence, we just use our own continuous scale starting at 0. + */ + private static final int[] zstdCompressionLevels = { + -7, // 0 (very fast but compresses poorly) + -6, // 1 + -5, // 2 (LZ4 level is somewhere here) + -4, // 3 + -3, // 4 + -2, // 5 + -1, // 6 + 1, // 7 (sweet spot area usually here) + 2, // 8 (sweet spot area usually here) + 3, // 9 (sweet spot area usually here, ~50% slower than LZ4) + 4, // 10 (sweet spot area usually here) + 5, // 11 (sweet spot area usually here) + 6, // 12 + 7, // 13 + 8, // 14 + 9, // 15 (very slow, usually over 10x slower than LZ4) + }; + + public static final int MIN_COMPRESSION_LEVEL = 0; + public static final int MAX_COMPRESSION_LEVEL = 15; + + public static final int DEFAULT_MIN_FAST_COMPRESSION_LEVEL = 2; // zstd level -5 + public static final int DEFAULT_MAX_FAST_COMPRESSION_LEVEL = 9; // zstd level 5 + public static final int DEFAULT_MIN_GENERAL_COMPRESSION_LEVEL = 7; // zstd level 1 + public static final int DEFAULT_MAX_GENERAL_COMPRESSION_LEVEL = 12; // zstd level 6 + public static final int DEFAULT_MAX_COMPACTION_QUEUE_LENGTH = 16; + + private static final ConcurrentHashMap instances = new ConcurrentHashMap<>(); + + public static AdaptiveCompressor create(Map options) + { + int minCompressionLevel = getMinCompressionLevel(Uses.GENERAL, options); + int maxCompressionLevel = getMaxCompressionLevel(Uses.GENERAL, options); + int maxCompactionQueueLength = getMaxCompactionQueueLength(options); + return createForCompaction(minCompressionLevel, maxCompressionLevel, maxCompactionQueueLength); + } + + private static AdaptiveCompressor createForCompaction(int minCompressionLevel, int maxCompressionLevel, int maxCompactionQueueLength) + { + Params params = new Params(Uses.GENERAL, minCompressionLevel, maxCompressionLevel, maxCompactionQueueLength); + Supplier compactionPressureSupplier = () -> getCompactionPressure(maxCompactionQueueLength); + return instances.computeIfAbsent(params, p -> new AdaptiveCompressor(p, compactionPressureSupplier)); + } + + /** + * Creates a compressor that doesn't refer to any other C* components like compaction manager or memory pools. + */ + @VisibleForTesting + public static ICompressor createForUnitTesting() + { + Params params = new Params(Uses.GENERAL, 9, 9, 0); + return new AdaptiveCompressor(params, () -> 0.0); + } + + public static AdaptiveCompressor createForFlush(Map options) + { + int minCompressionLevel = getMinCompressionLevel(Uses.FAST_COMPRESSION, options); + int maxCompressionLevel = getMaxCompressionLevel(Uses.FAST_COMPRESSION, options); + return createForFlush(minCompressionLevel, maxCompressionLevel); + } + + private static AdaptiveCompressor createForFlush(int minCompressionLevel, int maxCompressionLevel) + { + Params params = new Params(Uses.FAST_COMPRESSION, minCompressionLevel, maxCompressionLevel, 0); + return instances.computeIfAbsent(params, p -> new AdaptiveCompressor(p, AdaptiveCompressor::getFlushPressure)); + } + + private final Params params; + private final ThreadLocal state; + private final Supplier writePressureSupplier; + + + static class Params + { + final Uses use; + final int minCompressionLevel; + final int maxCompressionLevel; + final int maxCompactionQueueLength; + + Params(Uses use, int minCompressionLevel, int maxCompressionLevel, int maxCompactionQueueLength) + { + if (minCompressionLevel < MIN_COMPRESSION_LEVEL || minCompressionLevel > MAX_COMPRESSION_LEVEL) + throw new IllegalArgumentException("Min compression level " + minCompressionLevel + "out of range" + + " [" + MIN_COMPRESSION_LEVEL + ", " + MAX_COMPRESSION_LEVEL + ']'); + if (maxCompressionLevel < MIN_COMPRESSION_LEVEL || maxCompressionLevel > MAX_COMPRESSION_LEVEL) + throw new IllegalArgumentException("Max compression level " + maxCompressionLevel + "out of range" + + " [" + MIN_COMPRESSION_LEVEL + ", " + MAX_COMPRESSION_LEVEL + ']'); + if (maxCompactionQueueLength < 0) + throw new IllegalArgumentException("Negative max compaction queue length: " + maxCompactionQueueLength); + + this.use = use; + this.minCompressionLevel = minCompressionLevel; + this.maxCompressionLevel = maxCompressionLevel; + this.maxCompactionQueueLength = maxCompactionQueueLength; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + Params params = (Params) o; + return minCompressionLevel == params.minCompressionLevel && maxCompressionLevel == params.maxCompressionLevel && use == params.use; + } + + @Override + public int hashCode() + { + return Objects.hash(use, minCompressionLevel, maxCompressionLevel); + } + } + + /** + * Keeps thread local state. + * We need this because we want to not only monitor pending flushes/compactions but also how much + * time we spend in compression relative to time spent by the thread in non-compression tasks like preparation + * of data or writing. Because ICompressor can be shared by multiple threads, we need to keep + * track of each thread separately. + */ + class State + { + /** + * ZStandard compression level that was used when compressing the previous chunk. + * Can be adjusted up or down by at most 1 with every next block. + */ + int currentCompressionLevel; + + /** + * How much time is spent by the thread in compression relative to the time spent in non-compression code. + * Valid range is [0.0, 1.0]. + * 1.0 means we're doing only compression and nothing else. + * 0.0 means we're not spending any time doing compression. + * This indicator allows us to detect whether we're bottlenecked by something else than compression, + * e.g. by disk I/O or by preparation of data to compress (e.g. iterating the memtable trie). + * If this value is low, then there is not much gain in decreasing the compression + * level. + */ + ExpMovingAverage relativeTimeSpentCompressing = ExpMovingAverage.decayBy10(); + + long lastCompressionStartTime; + long lastCompressionDuration; + + /** + * Computes the new compression level to use for the next chunk, based on the load. + */ + public int adjustAndGetCompressionLevel(long currentTime) + { + // The more write "pressure", the faster we want to go, so the lower the desired compression level. + double pressure = getWritePressure(); + assert pressure >= 0.0 && pressure <= 1.0 : "pressure (" + pressure + ") out of valid range [0.0, 1.0]"; + + // Use minCompressionLevel when pressure = 1.0, maxCompressionLevel when pressure = 0.0 + int pressurePoints = (int) (pressure * (params.maxCompressionLevel - params.minCompressionLevel)); + int compressionLevelTarget = params.maxCompressionLevel - pressurePoints; + + // We use wall clock time and not CPU time, because we also want to include time spent by I/O. + // If we're bottlenecked by writing the data to disk, this indicator should be low. + double relativeTimeSpentCompressing = (double) (1 + lastCompressionDuration) / (1 + currentTime - lastCompressionStartTime); + + // Some smoothing is needed to avoid changing level too fast due to performance hiccups + this.relativeTimeSpentCompressing.update(relativeTimeSpentCompressing); + + // If we're under pressure to write data fast, we need to decrease compression level. + // But we do that only if we're really spending significant amount of time doing compression. + if (compressionLevelTarget < currentCompressionLevel && this.relativeTimeSpentCompressing.get() > 0.1) + currentCompressionLevel--; + // If we're not under heavy write pressure, or we're spending very little time compressing data, + // we can increase the compression level and get some space savings at a low performance overhead: + else if (compressionLevelTarget > currentCompressionLevel || this.relativeTimeSpentCompressing.get() < 0.02) + currentCompressionLevel++; + + currentCompressionLevel = clampCompressionLevel(currentCompressionLevel); + return currentCompressionLevel; + } + + /** + * Must be called after compressing a chunk, + * so we can measure how much time we spend in compressing vs time spent not-compressing. + */ + public void recordCompressionDuration(long startTime, long endTime) + { + this.lastCompressionDuration = endTime - startTime; + this.lastCompressionStartTime = startTime; + } + + @VisibleForTesting + double getRelativeTimeSpentCompressing() + { + return this.relativeTimeSpentCompressing.get(); + } + } + + /** + * @param params user-provided configuration such as min/max compression level range + * @param writePressureSupplier returns a non-negative score determining the write load on the system which + * is used to control the desired compression level. Influences the compression + * level linearly: an inceease of pressure by 1 point causes the target + * compression level to be decreased by 1 point. Zero will select the + * maximum allowed compression level. + */ + @VisibleForTesting + AdaptiveCompressor(Params params, Supplier writePressureSupplier) + { + this.params = params; + this.state = new ThreadLocal<>(); + this.writePressureSupplier = writePressureSupplier; + } + + @Override + public int initialCompressedBufferLength(int chunkLength) + { + return (int) Zstd.compressBound(chunkLength); + } + + + @Override + public void compress(ByteBuffer input, ByteBuffer output) throws IOException + { + try + { + State state = getThreadLocalState(); + long startTime = System.nanoTime(); + int compressionLevel = zstdCompressionLevels[state.adjustAndGetCompressionLevel(startTime)]; + Zstd.compress(output, input, compressionLevel, true); + long endTime = System.nanoTime(); + state.recordCompressionDuration(startTime, endTime); + metrics.get(params.use).updateFrom(state); + } + catch (Exception e) + { + throw new IOException("Compression failed", e); + } + + } + + @Override + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException + { + long dsz = Zstd.decompressByteArray(output, outputOffset, output.length - outputOffset, + input, inputOffset, inputLength); + + if (Zstd.isError(dsz)) + throw new IOException(String.format("Decompression failed due to %s", Zstd.getErrorName(dsz))); + + return (int) dsz; + } + + @Override + public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException + { + try + { + Zstd.decompress(output, input); + } catch (Exception e) + { + throw new IOException("Decompression failed", e); + } + } + + @Override + public BufferType preferredBufferType() + { + return BufferType.OFF_HEAP; + } + + @Override + public Set recommendedUses() + { + return params.minCompressionLevel <= DEFAULT_MIN_FAST_COMPRESSION_LEVEL + ? EnumSet.of(Uses.GENERAL, Uses.FAST_COMPRESSION) + : EnumSet.of(params.use); + } + + @Override + public ICompressor forUse(Uses use) + { + if (use == params.use) + return this; + + switch (use) + { + case GENERAL: + return createForCompaction(params.minCompressionLevel, params.maxCompressionLevel, params.maxCompactionQueueLength); + case FAST_COMPRESSION: + return createForFlush(params.minCompressionLevel, params.maxCompressionLevel); + } + + return null; + } + + @Override + public boolean supports(BufferType bufferType) + { + return bufferType == BufferType.OFF_HEAP; + } + + @Override + public Set supportedOptions() + { + return Set.of("max_compression_level", "min_compression_level"); + } + + private static int getMinCompressionLevel(Uses mode, Map options) + { + int defaultValue = mode == Uses.FAST_COMPRESSION ? DEFAULT_MIN_FAST_COMPRESSION_LEVEL : DEFAULT_MIN_GENERAL_COMPRESSION_LEVEL; + return getIntOption(options, MIN_COMPRESSION_LEVEL_OPTION_NAME, defaultValue); + } + + private static int getMaxCompressionLevel(Uses mode, Map options) + { + var defaultValue = mode == Uses.FAST_COMPRESSION ? DEFAULT_MAX_FAST_COMPRESSION_LEVEL : DEFAULT_MAX_GENERAL_COMPRESSION_LEVEL; + return getIntOption(options, MAX_COMPRESSION_LEVEL_OPTION_NAME, defaultValue); + } + + private static int getMaxCompactionQueueLength(Map options) + { + return getIntOption(options, MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME, DEFAULT_MAX_COMPACTION_QUEUE_LENGTH); + } + + private static int getIntOption(Map options, String key, int defaultValue) + { + if (options == null) + return defaultValue; + + String val = options.get(key); + if (val == null) + return defaultValue; + + return Integer.parseInt(val); + } + + private double getWritePressure() + { + return writePressureSupplier.get(); + } + + private static double getFlushPressure() + { + var memoryPool = AbstractAllocatorMemtable.MEMORY_POOL; + var usedRatio = Math.max(memoryPool.onHeap.usedRatio(), memoryPool.offHeap.usedRatio()); + var cleanupThreshold = DatabaseDescriptor.getMemtableCleanupThreshold(); + // we max out the pressure when we're halfway between the cleanupThreshold and max memory + // so we still have some memory left while compression already working at max speed; + // setting the compressor to maximum speed when we exhausted all memory would be too late + return Math.min(1.0, Math.max(0.0, 2 * (usedRatio - cleanupThreshold)) / (1.0 - cleanupThreshold)); + } + + private static double getCompactionPressure(int maxCompactionQueueLength) + { + CompactionManager compactionManager = CompactionManager.instance; + long rateLimit = DatabaseDescriptor.getCompactionThroughputMbPerSec() * FileUtils.ONE_MB; + if (rateLimit == 0) + rateLimit = Long.MAX_VALUE; + double actualRate = compactionManager.getMetrics().bytesCompactedThroughput.getOneMinuteRate(); + // We don't want to speed up compression if we can keep up with the configured compression rate limit + // 0.0 if actualRate >= rateLimit + // 1.0 if actualRate <= 0.8 * rateLimit; + double rateLimitFactor = Math.min(1.0, Math.max(0.0, (rateLimit - actualRate) / (0.2 * rateLimit))); + + long pendingCompactions = compactionManager.getPendingTasks(); + long activeCompactions = compactionManager.getActiveCompactions(); + long queuedCompactions = pendingCompactions - activeCompactions; + double compactionQueuePressure = Math.min(1.0, (double) queuedCompactions / (maxCompactionQueueLength * DatabaseDescriptor.getConcurrentCompactors())); + return compactionQueuePressure * rateLimitFactor; + } + + private int clampCompressionLevel(long compressionLevel) + { + return (int) Math.min(params.maxCompressionLevel, Math.max(params.minCompressionLevel, compressionLevel)); + } + + @VisibleForTesting + State getThreadLocalState() + { + State state = this.state.get(); + if (state == null) + { + state = new State(); + state.currentCompressionLevel = params.maxCompressionLevel; + state.lastCompressionDuration = 0; + this.state.set(state); + } + return state; + } + + static class Metrics + { + private final Counter[] compressionLevelHistogram; // separate counters for each compression level + private final Histogram relativeTimeSpentCompressing; // in % (i.e. multiplied by 100 becaue Histogram can only keep integers) + + Metrics(Uses use) + { + MetricNameFactory factory = new DefaultNameFactory("AdaptiveCompression"); + + // cannot use Metrics.histogram for compression levels, because histograms do not handle negative numbers; + // also this histogram is small enough that storing all buckets is not a problem, but it gives + // much more information + compressionLevelHistogram = new Counter[MAX_COMPRESSION_LEVEL + 1]; + for (int i = 0; i < compressionLevelHistogram.length; i++) + { + CassandraMetricsRegistry.MetricName metricName = factory.createMetricName(String.format("CompressionLevel_%s_%02d", use.name(), i)); + compressionLevelHistogram[i] = CassandraMetricsRegistry.Metrics.counter(metricName); + } + + relativeTimeSpentCompressing = CassandraMetricsRegistry.Metrics.histogram(factory.createMetricName("RelativeTimeSpentCompressing_" + use.name()), true); + } + + void updateFrom(State state) + { + compressionLevelHistogram[state.currentCompressionLevel].inc(); + relativeTimeSpentCompressing.update((int)(state.getRelativeTimeSpentCompressing() * 100.0)); + } + } +} diff --git a/src/java/org/apache/cassandra/io/compress/ICompressor.java b/src/java/org/apache/cassandra/io/compress/ICompressor.java index fd6a104431b3..b6f8b2c50eea 100644 --- a/src/java/org/apache/cassandra/io/compress/ICompressor.java +++ b/src/java/org/apache/cassandra/io/compress/ICompressor.java @@ -83,4 +83,18 @@ default Set recommendedUses() { return ImmutableSet.copyOf(EnumSet.allOf(Uses.class)); } + + /** + * Returns the compressor configured for a particular use. + * Allows creating a compressor implementation that can handle multiple uses but requires different configurations + * adapted to a particular use. + *

+ * May return this object. + * May not modify this object. + * Should return null if the request cannot be satisfied. + */ + default ICompressor forUse(Uses use) + { + return recommendedUses().contains(use) ? this : null; + } } diff --git a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java index a0d2cf290d76..8ccad31a624a 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -43,6 +44,7 @@ import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.guardrails.Guardrails; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.AdaptiveCompressor; import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.sstable.Component; @@ -153,13 +155,21 @@ public static CompressionParams compressionFor(final OperationType opType, Table case fast: if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION)) { - // The default compressor is generally fast (LZ4 with 16KiB block size) - compressionParams = CompressionParams.DEFAULT; + compressionParams = CompressionParams.FAST; + break; + } + // else fall through + case adaptive: + if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION)) + { + compressionParams = CompressionParams.FAST_ADAPTIVE; break; } // else fall through case table: default: + compressionParams = Optional.ofNullable(compressionParams.forUse(ICompressor.Uses.FAST_COMPRESSION)) + .orElse(compressionParams); break; } } diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java index 53760a91893e..22a07a97a582 100644 --- a/src/java/org/apache/cassandra/schema/CompressionParams.java +++ b/src/java/org/apache/cassandra/schema/CompressionParams.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ThreadLocalRandom; import com.google.common.annotations.VisibleForTesting; @@ -35,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ConfigurationException; @@ -66,12 +68,26 @@ public final class CompressionParams public static final String ENABLED = "enabled"; public static final String MIN_COMPRESS_RATIO = "min_compress_ratio"; - public static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.create(Collections.emptyMap()), + public static final CompressionParams FAST = new CompressionParams(LZ4Compressor.create(Collections.emptyMap()), DEFAULT_CHUNK_LENGTH, calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO), DEFAULT_MIN_COMPRESS_RATIO, Collections.emptyMap()); + public static final CompressionParams ADAPTIVE = new CompressionParams(AdaptiveCompressor.create(Collections.emptyMap()), + DEFAULT_CHUNK_LENGTH, + calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO), + DEFAULT_MIN_COMPRESS_RATIO, + Collections.emptyMap()); + + public static final CompressionParams FAST_ADAPTIVE = new CompressionParams(AdaptiveCompressor.createForFlush(Collections.emptyMap()), + DEFAULT_CHUNK_LENGTH, + calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO), + DEFAULT_MIN_COMPRESS_RATIO, + Collections.emptyMap()); + + public static final CompressionParams DEFAULT = DatabaseDescriptor.shouldUseAdaptiveCompressionByDefault() ? ADAPTIVE : FAST; + public static final CompressionParams NOOP = new CompressionParams(NoopCompressor.create(Collections.emptyMap()), // 4 KiB is often the underlying disk block size 1024 * 4, @@ -249,6 +265,24 @@ public boolean isEnabled() return sstableCompressor != null; } + /** + * Specializes the compressor for given use. + * May cause reconfiguration of parameters on some compressors. + * Returns null if params are not compatible with the given use. + */ + public CompressionParams forUse(ICompressor.Uses use) + { + ICompressor specializedCompressor = this.sstableCompressor.forUse(use); + if (specializedCompressor == null) + return null; + + assert specializedCompressor.recommendedUses().contains(use); + if (specializedCompressor == sstableCompressor) + return this; + + return new CompressionParams(specializedCompressor, chunkLength, maxCompressedLength, minCompressRatio, otherOptions); + } + /** * Returns the SSTable compressor. * @return the SSTable compressor or {@code null} if compression is disabled. diff --git a/test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java b/test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java new file mode 100644 index 000000000000..b8cf758e933d --- /dev/null +++ b/test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; + +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +import org.apache.cassandra.io.util.RandomAccessReader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class AdaptiveCompressorTest +{ + + @Test(expected = IllegalArgumentException.class) + public void badCompressionLevelParamThrowsExceptionMin() + { + AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MIN_COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(AdaptiveCompressor.MIN_COMPRESSION_LEVEL - 1))); + } + + @Test(expected = IllegalArgumentException.class) + public void badCompressionLevelParamThrowsExceptionMax() + { + AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MAX_COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(AdaptiveCompressor.MAX_COMPRESSION_LEVEL + 1))); + } + + @Test(expected = IllegalArgumentException.class) + public void badMaxCompactionQueueLengthParamThrowsExceptionMin() + { + AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME, "-1")); + } + + @Test + public void averageRelativeTimeCompressingIsMeasuredProperly() throws IOException, InterruptedException + { + var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 15, 15, 0); + AdaptiveCompressor c1 = new AdaptiveCompressor(params, () -> 0.0); + ByteBuffer src = getSrcByteBuffer(); + ByteBuffer dest = getDstByteBuffer(c1); + for (int i = 0; i < 20000; i++) + { + compress(c1, src, dest); + } + assertTrue(c1.getThreadLocalState().getRelativeTimeSpentCompressing() > 0.8); + assertTrue(c1.getThreadLocalState().getRelativeTimeSpentCompressing() < 1.0); + + + var params2 = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 0, 0, 0); + AdaptiveCompressor c2 = new AdaptiveCompressor(params2, () -> 0.0); + for (int i = 0; i < 100; i++) + { + Thread.sleep(1); + compress(c2, src, dest); + } + assertTrue(c2.getThreadLocalState().getRelativeTimeSpentCompressing() < 0.02); + assertTrue(c2.getThreadLocalState().getRelativeTimeSpentCompressing() > 0.0); + } + + @Test + public void compressionLevelAdaptsToWritePressure() throws IOException + { + var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 2, 8, 0); + double[] load = { 1.0 }; + + AdaptiveCompressor c = new AdaptiveCompressor(params, () -> load[0]); + ByteBuffer src = getSrcByteBuffer(); + ByteBuffer dest = getDstByteBuffer(c); + + for (int i = 0; i < 10; i++) + compress(c, src, dest); + + assertEquals(2, c.getThreadLocalState().currentCompressionLevel); + + // Low load; compression level must be increased back to max: + load[0] = 0L; + + for (int i = 0; i < 10; i++) + compress(c, src, dest); + + assertEquals(8, c.getThreadLocalState().currentCompressionLevel); + } + + @Test + public void compressionLevelDoesNotDecreaseWhenCompressionIsNotABottleneck() throws IOException, InterruptedException + { + var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 2, 8, 0); + // Simulate high write load + AdaptiveCompressor c = new AdaptiveCompressor(params, () -> 1.0); + ByteBuffer src = getSrcByteBuffer(); + ByteBuffer dest = getDstByteBuffer(c); + + for (int i = 0; i < 200; i++) + { + Thread.sleep(1); // creates artificial bottleneck that is much slower than compression + compress(c, src, dest); + } + + assertEquals(8, c.getThreadLocalState().currentCompressionLevel); + } + + private static ByteBuffer getDstByteBuffer(ICompressor compressor) + { + return ByteBuffer.allocateDirect(compressor.initialCompressedBufferLength(RandomAccessReader.DEFAULT_BUFFER_SIZE)); + } + + private static ByteBuffer getSrcByteBuffer() + { + int n = RandomAccessReader.DEFAULT_BUFFER_SIZE; + byte[] srcData = new byte[n]; + new Random().nextBytes(srcData); + + ByteBuffer src = ByteBuffer.allocateDirect(n); + src.put(srcData, 0, n); + src.flip().position(0); + return src; + } + + private static void compress(AdaptiveCompressor c, ByteBuffer src, ByteBuffer dest) throws IOException + { + c.compress(src, dest); + src.rewind(); + dest.rewind(); + } + +} diff --git a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java index a1e09ed77895..0f117095b0e6 100644 --- a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java @@ -174,6 +174,39 @@ public void zstdFlushTest() throws Throwable }); } + @Test + public void adaptiveFlushTest() throws Throwable + { + createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'AdaptiveCompressor'};"); + DatabaseDescriptor.setFlushCompression(Config.FlushCompression.fast); + ColumnFamilyStore store = flushTwice(); + + // Should flush as LZ4 + Set sstables = store.getLiveSSTables(); + sstables.forEach(sstable -> { + assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof LZ4Compressor); + }); + store.truncateBlocking(); + + DatabaseDescriptor.setFlushCompression(Config.FlushCompression.adaptive); + store = flushTwice(); + + // Should flush as Adaptive + sstables = store.getLiveSSTables(); + sstables.forEach(sstable -> { + assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof AdaptiveCompressor); + }); + + // Should compact to Adaptive + compact(); + + sstables = store.getLiveSSTables(); + assertEquals(1, sstables.size()); + store.getLiveSSTables().forEach(sstable -> { + assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof AdaptiveCompressor); + }); + } + @Test public void deflateFlushTest() throws Throwable { diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java index a3a64babfa7d..7cf37844892e 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java @@ -29,8 +29,10 @@ import com.google.common.io.Files; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; @@ -41,6 +43,12 @@ public class CompressorTest { + @BeforeClass + public static void initialize() + { + DatabaseDescriptor.daemonInitialization(); + } + ICompressor compressor; ICompressor[] compressors = new ICompressor[] { @@ -48,6 +56,7 @@ public class CompressorTest DeflateCompressor.create(Collections.emptyMap()), SnappyCompressor.create(Collections.emptyMap()), ZstdCompressor.create(Collections.emptyMap()), + AdaptiveCompressor.createForUnitTesting(), NoopCompressor.create(Collections.emptyMap()) }; @@ -190,6 +199,13 @@ public void testZstdByteBuffers() throws IOException testByteBuffers(); } + @Test + public void testAdaptiveByteBuffers() throws IOException + { + compressor = AdaptiveCompressor.createForUnitTesting(); + testByteBuffers(); + } + @Test public void testNoopByteBuffers() throws IOException {