From 0f56d59643187ddc4d1c86edfecc1eed6863051c Mon Sep 17 00:00:00 2001 From: maoling Date: Wed, 12 Jun 2024 23:14:00 +0800 Subject: [PATCH 1/3] Partially port CASSANDRA-13890 from apache/cassandra What was ported: - current compaction throughput measurement by CompactionManager - exposing current compaction throughput in StorageService and CompactionMetrics - nodetool getcompactionthroughput, including tests Not ported: - changes to `nodetool compactionstats`, because that would require porting also the tests which are currently missing in CC and porting those tests turned out to be a complex task without porting the other changes in the CompactionManager API - Code for getting / setting compaction throughput as double --- CHANGES.txt | 2 + .../db/compaction/CompactionManager.java | 14 +- .../db/compaction/CompactionTask.java | 3 +- .../cassandra/metrics/CompactionMetrics.java | 3 + .../cassandra/service/StorageService.java | 17 +++ .../service/StorageServiceMBean.java | 1 + .../org/apache/cassandra/tools/NodeProbe.java | 5 + .../nodetool/GetCompactionThroughput.java | 11 +- .../SetGetCompactionThroughputTest.java | 122 ++++++++++++++++++ 9 files changed, 170 insertions(+), 8 deletions(-) create mode 100644 test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java diff --git a/CHANGES.txt b/CHANGES.txt index abd15011268f..59036cd5c32d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,7 @@ Future version (tbd) * Require only MODIFY permission on base when updating table with MV (STAR-564) +Merged from 5.1: + * Expose current compaction throughput in nodetool (CASSANDRA-13890) Merged from 5.0: * Disable chronicle analytics (CASSANDRA-19656) * Remove mocking in InternalNodeProbe spying on StorageServiceMBean (CASSANDRA-18152) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 2adaea3eb168..0b2da3297d79 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -65,7 +65,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import com.codahale.metrics.Meter; import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; @@ -290,6 +290,11 @@ public void setRate(final double throughPutMbPerSec) compactionRateLimiter.setRate(throughput); } + public Meter getCompactionThroughput() + { + return metrics.bytesCompactedThroughput; + } + /** * Call this whenever a compaction might be needed on the given column family store. * It's okay to over-call (within reason) if a call is unnecessary, it will @@ -314,7 +319,7 @@ public CompletableFuture[] startCompactionTasks(ColumnFamilyStore cfs, Collec { return backgroundCompactionRunner.startCompactionTasks(cfs, tasks); } - + public int getOngoingBackgroundUpgradesCount() { return backgroundCompactionRunner.getOngoingUpgradesCount(); @@ -1373,7 +1378,7 @@ private void doCleanupOne(final ColumnFamilyStore cfs, } - static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio) + protected boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio) { if (DatabaseDescriptor.getCompactionThroughputMbPerSec() == 0) return false; @@ -1386,8 +1391,9 @@ static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScann return actuallyAcquire(limiter, lengthRead); } - private static boolean actuallyAcquire(RateLimiter limiter, long lengthRead) + private boolean actuallyAcquire(RateLimiter limiter, long lengthRead) { + metrics.bytesCompactedThroughput.mark(lengthRead); while (lengthRead >= Integer.MAX_VALUE) { limiter.acquire(Integer.MAX_VALUE); diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 9e07eeec5d19..ef7a51603cb8 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -63,7 +63,6 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.COMPACTION_HISTORY_ENABLED; import static org.apache.cassandra.config.CassandraRelevantProperties.CURSORS_ENABLED; -import static org.apache.cassandra.db.compaction.CompactionManager.compactionRateLimiterAcquire; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemoryPerSecond; @@ -658,7 +657,7 @@ void execute0() long bytesScanned = compactionIterator.getTotalBytesScanned(); // Rate limit the scanners, and account for compression - if (compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio)) + if (CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio)) lastBytesScanned = bytesScanned; maybeStopOrUpdateState(); diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java index c11476122076..45552e594fe2 100644 --- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java @@ -66,6 +66,8 @@ public class CompactionMetrics public final Counter totalCompactionsFailed; /** Total number of bytes processed by operations since server [re]start */ public final Counter bytesCompacted; + /** Recent/current throughput of compactions take */ + public final Meter bytesCompactedThroughput; /** * The compaction strategy information for each table. Cached, because its computation might be fairly expensive. @@ -190,6 +192,7 @@ public Long getValue() totalCompactionsCompleted = Metrics.meter(factory.createMetricName("TotalCompactionsCompleted")); totalCompactionsFailed = Metrics.counter(factory.createMetricName("FailedCompactions")); bytesCompacted = Metrics.counter(factory.createMetricName("BytesCompacted")); + bytesCompactedThroughput = Metrics.meter(factory.createMetricName("BytesCompactedThroughput")); // compaction failure metrics compactionsReduced = Metrics.counter(factory.createMetricName("CompactionsReduced")); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index ba57b3ad0db8..b92670198523 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -96,6 +96,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.Meter; import org.apache.cassandra.audit.AuditLogManager; import org.apache.cassandra.audit.AuditLogOptions; import org.apache.cassandra.auth.AuthKeyspace; @@ -233,6 +234,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACEMENT_ALLOW_EMPTY; import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName; import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily; +import static org.apache.cassandra.io.util.FileUtils.ONE_MB; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ; import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; @@ -1650,6 +1652,21 @@ public void setCompactionThroughputMbPerSec(int value) CompactionManager.instance.setRate(value); } + /** + * Get the Current Compaction Throughput + * key is 1/5/15minute time dimension for statistics + * value is the metric double string (unit is:mib/s) + */ + public Map getCurrentCompactionThroughputMebibytesPerSec() + { + HashMap result = new LinkedHashMap<>(); + Meter rate = CompactionManager.instance.getCompactionThroughput(); + result.put("1minute", String.format("%.3f", rate.getOneMinuteRate() / ONE_MB)); + result.put("5minute", String.format("%.3f", rate.getFiveMinuteRate() / ONE_MB)); + result.put("15minute", String.format("%.3f", rate.getFifteenMinuteRate() / ONE_MB)); + return result; + } + public int getBatchlogReplayThrottleInKB() { return DatabaseDescriptor.getBatchlogReplayThrottleInKB(); diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 96d512e6284e..8dded652c07a 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -631,6 +631,7 @@ public interface StorageServiceMBean extends NotificationEmitter public int getCompactionThroughputMbPerSec(); public void setCompactionThroughputMbPerSec(int value); + Map getCurrentCompactionThroughputMebibytesPerSec(); public int getBatchlogReplayThrottleInKB(); public void setBatchlogReplayThrottleInKB(int value); diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 75b6a2775205..098124c6e4aa 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1138,6 +1138,11 @@ public int getCompactionThroughput() return ssProxy.getCompactionThroughputMbPerSec(); } + public Map getCurrentCompactionThroughputMiBPerSec() + { + return ssProxy.getCurrentCompactionThroughputMebibytesPerSec(); + } + public void setBatchlogReplayThrottle(int value) { ssProxy.setBatchlogReplayThrottleInKB(value); diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java index 839c78d3195b..8c17ace1882c 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java +++ b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java @@ -17,17 +17,24 @@ */ package org.apache.cassandra.tools.nodetool; +import java.util.Map; + import io.airlift.airline.Command; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; -@Command(name = "getcompactionthroughput", description = "Print the MB/s throughput cap for compaction in the system") +@Command(name = "getcompactionthroughput", description = "Print the MiB/s throughput cap for compaction in the system") public class GetCompactionThroughput extends NodeToolCmd { @Override public void execute(NodeProbe probe) { - probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MB/s"); + probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MiB/s"); + + Map currentCompactionThroughputMetricsMap = probe.getCurrentCompactionThroughputMiBPerSec(); + probe.output().out.println("Current compaction throughput (1 minute): " + currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s"); + probe.output().out.println("Current compaction throughput (5 minute): " + currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s"); + probe.output().out.println("Current compaction throughput (15 minute): " + currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s"); } } diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java new file mode 100644 index 000000000000..76c75350121f --- /dev/null +++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; + +import static org.apache.cassandra.tools.ToolRunner.ToolResult; +import static org.apache.cassandra.tools.ToolRunner.invokeNodetool; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@code nodetool setcompactionthroughput} and {@code nodetool getcompactionthroughput}. + */ +public class SetGetCompactionThroughputTest extends CQLTester +{ + private static final int MAX_INT_CONFIG_VALUE_IN_MBIT = Integer.MAX_VALUE - 1; + + @BeforeClass + public static void setup() throws Exception + { + requireNetwork(); + startJMXServer(); + } + + @Test + public void testNull() + { + assertSetInvalidThroughput(null, "Required parameters are missing: compaction_throughput"); + } + + @Test + public void testPositive() + { + assertSetGetValidThroughput(7); + } + + @Test + public void testMaxValue() + { + assertSetGetValidThroughput(MAX_INT_CONFIG_VALUE_IN_MBIT); + } + + @Test + public void testZero() + { + assertSetGetValidThroughput(0); + } + + @Test + public void testUnparseable() + { + assertSetInvalidThroughput("1.2", "compaction_throughput: can not convert \"1.2\" to a Integer"); + assertSetInvalidThroughput("value", "compaction_throughput: can not convert \"value\" to a Integer"); + } + + @Test + public void testCurrentCompactionThroughput() + { + ToolResult tool = invokeNodetool("getcompactionthroughput"); + tool.assertOnCleanExit(); + + assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(1 minute\\): \\d+\\.\\d+ MiB/s"); + assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(5 minute\\): \\d+\\.\\d+ MiB/s"); + assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(15 minute\\): \\d+\\.\\d+ MiB/s"); + } + + private static void assertSetGetValidThroughput(int throughput) + { + ToolResult tool = invokeNodetool("setcompactionthroughput", String.valueOf(throughput)); + tool.assertOnCleanExit(); + assertThat(tool.getStdout()).isEmpty(); + + assertGetThroughput(throughput); + } + + private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage) + { + ToolResult tool = throughput == null ? invokeNodetool("setcompactionthroughput") + : invokeNodetool("setcompactionthroughput", throughput); + assertThat(tool.getExitCode()).isEqualTo(1); + assertThat(tool.getStdout()).contains(expectedErrorMessage); + } + + private static void assertSetInvalidThroughputMib(String throughput) + { + ToolResult tool = invokeNodetool("setcompactionthroughput", throughput); + assertThat(tool.getExitCode()).isEqualTo(1); + assertThat(tool.getStdout()).contains("compaction_throughput: 2147483647 is too large; it should be less than" + + " 2147483647 in MiB/s"); + } + + private static void assertGetThroughput(int expected) + { + ToolResult tool = invokeNodetool("getcompactionthroughput"); + tool.assertOnCleanExit(); + + if (expected > 0) + assertThat(tool.getStdout()).contains("Current compaction throughput: " + expected + " MiB/s"); + else + assertThat(tool.getStdout()).contains("Current compaction throughput: 0 MiB/s"); + } +} From 90da921a97d3a62aedfdd178efcccbd5ec83a7cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Mon, 18 Nov 2024 12:47:56 +0100 Subject: [PATCH 2/3] CNDB-11532: Adaptive compression This commit introduces a new AdaptiveCompressor class. AdaptiveCompressor uses ZStandard compression with a dynamic compression level based on the current write load. AdaptiveCompressor's goal is to provide similar write performance as LZ4Compressor for write heavy workloads, but a significantly better compression ratio for databases with a moderate amount of writes or on systems with a lot of spare CPU power. If the memtable flush queue builds up, and it turns out the compression is a significant bottleneck, then the compression level used for flushing is decreased to gain speed. Similarly, when pending compaction tasks build up, then the compression level used for compaction is decreased. In order to enable adaptive compression: - set `-Dcassandra.default_sstable_compression=adaptive` JVM option to automatically select `AdaptiveCompressor` as the main compressor for flushes and new tables, if not overriden by specific options in cassandra.yaml or table schema - set `flush_compression: adaptive` in cassandra.yaml to enable it for flushing - set `AdaptiveCompressor` in Table options to enable it for compaction Caution: this feature is not turned on by default because it may impact read speed negatively in some rare cases. Fixes https://github.com/riptano/cndb/issues/11532 --- conf/cassandra.yaml | 2 + .../org/apache/cassandra/config/Config.java | 6 +- .../cassandra/config/DatabaseDescriptor.java | 11 +- .../io/compress/AdaptiveCompressor.java | 500 ++++++++++++++++++ .../cassandra/io/compress/ICompressor.java | 14 + .../io/sstable/format/SortedTableWriter.java | 14 +- .../cassandra/schema/CompressionParams.java | 36 +- .../io/compress/AdaptiveCompressorTest.java | 147 +++++ .../io/compress/CQLCompressionTest.java | 33 ++ .../cassandra/io/compress/CompressorTest.java | 16 + 10 files changed, 773 insertions(+), 6 deletions(-) create mode 100644 src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java create mode 100644 test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index b6f4d4bf5b39..b66120417776 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -479,6 +479,8 @@ commitlog_segment_size_in_mb: 32 # none : Flush without compressing blocks but while still doing checksums. # fast : Flush with a fast compressor. If the table is already using a # fast compressor that compressor is used. +# adaptive : Flush with a fast adaptive compressor. If the table is already using a +# fast compressor that compressor is used. # table: Always flush with the same compressor that the table uses. This # was the pre 4.0 behavior. # diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 68ce61c7900d..655256e4e45c 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -20,6 +20,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -37,6 +38,7 @@ import org.apache.cassandra.fql.FullQueryLoggerOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.guardrails.GuardrailsConfig; +import org.apache.cassandra.io.compress.AdaptiveCompressor; import org.apache.cassandra.utils.FBUtilities; /** @@ -292,8 +294,9 @@ public class Config public double commitlog_sync_group_window_in_ms = Double.NaN; public int commitlog_sync_period_in_ms; public int commitlog_segment_size_in_mb = 32; + public ParameterizedClass commitlog_compression; - public FlushCompression flush_compression = FlushCompression.fast; + public FlushCompression flush_compression; public int commitlog_max_compression_buffers_in_pool = 3; public Integer periodic_commitlog_sync_lag_block_in_ms; public TransparentDataEncryptionOptions transparent_data_encryption_options = new TransparentDataEncryptionOptions(); @@ -664,6 +667,7 @@ public enum FlushCompression { none, fast, + adaptive, table } diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 84b2278909c1..7c54de174f17 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2119,7 +2119,9 @@ public static void setCommitLogCompression(ParameterizedClass compressor) public static Config.FlushCompression getFlushCompression() { - return conf.flush_compression; + return Objects.requireNonNullElseGet(conf.flush_compression, () -> shouldUseAdaptiveCompressionByDefault() + ? Config.FlushCompression.adaptive + : Config.FlushCompression.fast); } public static void setFlushCompression(Config.FlushCompression compression) @@ -2127,7 +2129,12 @@ public static void setFlushCompression(Config.FlushCompression compression) conf.flush_compression = compression; } - /** + public static boolean shouldUseAdaptiveCompressionByDefault() + { + return System.getProperty("cassandra.default_sstable_compression", "fast").equals("adaptive"); + } + + /** * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use * more, depending on how soon the sync policy stops all writing threads. diff --git a/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java b/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java new file mode 100644 index 000000000000..e83a2c859209 --- /dev/null +++ b/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; +import com.github.luben.zstd.Zstd; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.memtable.AbstractAllocatorMemtable; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.metrics.CassandraMetricsRegistry; +import org.apache.cassandra.metrics.DefaultNameFactory; +import org.apache.cassandra.metrics.MetricNameFactory; +import org.apache.cassandra.utils.ExpMovingAverage; + +/** + * A compressor that dynamically adapts the compression level to the load. + * If the system is not heavily loaded by writes, data are compressed using high compression level. + * If the number of compactions or flushes queue up, it decreases the compression level to speed up + * flushing / compacting. + *

+ * Underneath, the ZStandard compressor is used. The compression level can be changed between the frames, + * even when compressing the same sstable file. ZStandard was chosen because at the fast end it + * can reach compression speed of LZ4, but at moderate compression levels it usually offers much better + * compression ratio (typically files are smaller by 20-40%) than LZ4 without compromising compression speed by too much. + *

+ * This compressor can be used for either of Uses: FAST_COMPRESSION and GENERAL. + * Each use can have different minimum and maximum compression level limits. + * For FAST_COMPRESSION, the number of pending flushes is used as the indicator of write load. + * For GENERAL compression, the number of pending compactions is used as the indicator of write load. + *

+ * Valid compression levels are in range 0..15 (inclusive), where 0 means fastest compression and 15 means slowest/best. + * Usually levels around 7-11 strike the best balance between performance and compresion ratio. + * Going above level 12 usually only results in slower compression but not much compression ratio improvement. + *

+ * Caution: This compressor decompresses about 2x-4x slower than LZ4Compressor, regardless of the compression level. + * Therefore, it may negatively affect read speed from very read-heavy tables, especially when the chunk-cache + * hit ratio is low. In synthetic tests with chunk cache disabled, read throughput turned out to be up to 10% + * lower than when using LZ4 on some workloads. + */ +public class AdaptiveCompressor implements ICompressor +{ + @VisibleForTesting + static final Map metrics = new EnumMap<>(Map.of( + Uses.FAST_COMPRESSION, new Metrics(Uses.FAST_COMPRESSION), + Uses.GENERAL, new Metrics(Uses.GENERAL) + )); + + protected static final String MIN_COMPRESSION_LEVEL_OPTION_NAME = "min_compression_level"; + protected static final String MAX_COMPRESSION_LEVEL_OPTION_NAME = "max_compression_level"; + protected static final String MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME = "max_compaction_queue_length"; + + /** + * Maps AdaptiveCompressor compression level to underlying ZStandard compression levels. + * This mapping is needed because ZStandard levels are not continuous, zstd level 0 is special and means level 3. + * Hence, we just use our own continuous scale starting at 0. + */ + private static final int[] zstdCompressionLevels = { + -7, // 0 (very fast but compresses poorly) + -6, // 1 + -5, // 2 (LZ4 level is somewhere here) + -4, // 3 + -3, // 4 + -2, // 5 + -1, // 6 + 1, // 7 (sweet spot area usually here) + 2, // 8 (sweet spot area usually here) + 3, // 9 (sweet spot area usually here, ~50% slower than LZ4) + 4, // 10 (sweet spot area usually here) + 5, // 11 (sweet spot area usually here) + 6, // 12 + 7, // 13 + 8, // 14 + 9, // 15 (very slow, usually over 10x slower than LZ4) + }; + + public static final int MIN_COMPRESSION_LEVEL = 0; + public static final int MAX_COMPRESSION_LEVEL = 15; + + public static final int DEFAULT_MIN_FAST_COMPRESSION_LEVEL = 2; // zstd level -5 + public static final int DEFAULT_MAX_FAST_COMPRESSION_LEVEL = 9; // zstd level 5 + public static final int DEFAULT_MIN_GENERAL_COMPRESSION_LEVEL = 7; // zstd level 1 + public static final int DEFAULT_MAX_GENERAL_COMPRESSION_LEVEL = 12; // zstd level 6 + public static final int DEFAULT_MAX_COMPACTION_QUEUE_LENGTH = 16; + + private static final ConcurrentHashMap instances = new ConcurrentHashMap<>(); + + public static AdaptiveCompressor create(Map options) + { + int minCompressionLevel = getMinCompressionLevel(Uses.GENERAL, options); + int maxCompressionLevel = getMaxCompressionLevel(Uses.GENERAL, options); + int maxCompactionQueueLength = getMaxCompactionQueueLength(options); + return createForCompaction(minCompressionLevel, maxCompressionLevel, maxCompactionQueueLength); + } + + private static AdaptiveCompressor createForCompaction(int minCompressionLevel, int maxCompressionLevel, int maxCompactionQueueLength) + { + Params params = new Params(Uses.GENERAL, minCompressionLevel, maxCompressionLevel, maxCompactionQueueLength); + Supplier compactionPressureSupplier = () -> getCompactionPressure(maxCompactionQueueLength); + return instances.computeIfAbsent(params, p -> new AdaptiveCompressor(p, compactionPressureSupplier)); + } + + /** + * Creates a compressor that doesn't refer to any other C* components like compaction manager or memory pools. + */ + @VisibleForTesting + public static ICompressor createForUnitTesting() + { + Params params = new Params(Uses.GENERAL, 9, 9, 0); + return new AdaptiveCompressor(params, () -> 0.0); + } + + public static AdaptiveCompressor createForFlush(Map options) + { + int minCompressionLevel = getMinCompressionLevel(Uses.FAST_COMPRESSION, options); + int maxCompressionLevel = getMaxCompressionLevel(Uses.FAST_COMPRESSION, options); + return createForFlush(minCompressionLevel, maxCompressionLevel); + } + + private static AdaptiveCompressor createForFlush(int minCompressionLevel, int maxCompressionLevel) + { + Params params = new Params(Uses.FAST_COMPRESSION, minCompressionLevel, maxCompressionLevel, 0); + return instances.computeIfAbsent(params, p -> new AdaptiveCompressor(p, AdaptiveCompressor::getFlushPressure)); + } + + private final Params params; + private final ThreadLocal state; + private final Supplier writePressureSupplier; + + + static class Params + { + final Uses use; + final int minCompressionLevel; + final int maxCompressionLevel; + final int maxCompactionQueueLength; + + Params(Uses use, int minCompressionLevel, int maxCompressionLevel, int maxCompactionQueueLength) + { + if (minCompressionLevel < MIN_COMPRESSION_LEVEL || minCompressionLevel > MAX_COMPRESSION_LEVEL) + throw new IllegalArgumentException("Min compression level " + minCompressionLevel + "out of range" + + " [" + MIN_COMPRESSION_LEVEL + ", " + MAX_COMPRESSION_LEVEL + ']'); + if (maxCompressionLevel < MIN_COMPRESSION_LEVEL || maxCompressionLevel > MAX_COMPRESSION_LEVEL) + throw new IllegalArgumentException("Max compression level " + maxCompressionLevel + "out of range" + + " [" + MIN_COMPRESSION_LEVEL + ", " + MAX_COMPRESSION_LEVEL + ']'); + if (maxCompactionQueueLength < 0) + throw new IllegalArgumentException("Negative max compaction queue length: " + maxCompactionQueueLength); + + this.use = use; + this.minCompressionLevel = minCompressionLevel; + this.maxCompressionLevel = maxCompressionLevel; + this.maxCompactionQueueLength = maxCompactionQueueLength; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + Params params = (Params) o; + return minCompressionLevel == params.minCompressionLevel && maxCompressionLevel == params.maxCompressionLevel && use == params.use; + } + + @Override + public int hashCode() + { + return Objects.hash(use, minCompressionLevel, maxCompressionLevel); + } + } + + /** + * Keeps thread local state. + * We need this because we want to not only monitor pending flushes/compactions but also how much + * time we spend in compression relative to time spent by the thread in non-compression tasks like preparation + * of data or writing. Because ICompressor can be shared by multiple threads, we need to keep + * track of each thread separately. + */ + class State + { + /** + * ZStandard compression level that was used when compressing the previous chunk. + * Can be adjusted up or down by at most 1 with every next block. + */ + int currentCompressionLevel; + + /** + * How much time is spent by the thread in compression relative to the time spent in non-compression code. + * Valid range is [0.0, 1.0]. + * 1.0 means we're doing only compression and nothing else. + * 0.0 means we're not spending any time doing compression. + * This indicator allows us to detect whether we're bottlenecked by something else than compression, + * e.g. by disk I/O or by preparation of data to compress (e.g. iterating the memtable trie). + * If this value is low, then there is not much gain in decreasing the compression + * level. + */ + ExpMovingAverage relativeTimeSpentCompressing = ExpMovingAverage.decayBy10(); + + long lastCompressionStartTime; + long lastCompressionDuration; + + /** + * Computes the new compression level to use for the next chunk, based on the load. + */ + public int adjustAndGetCompressionLevel(long currentTime) + { + // The more write "pressure", the faster we want to go, so the lower the desired compression level. + double pressure = getWritePressure(); + assert pressure >= 0.0 && pressure <= 1.0 : "pressure (" + pressure + ") out of valid range [0.0, 1.0]"; + + // Use minCompressionLevel when pressure = 1.0, maxCompressionLevel when pressure = 0.0 + int pressurePoints = (int) (pressure * (params.maxCompressionLevel - params.minCompressionLevel)); + int compressionLevelTarget = params.maxCompressionLevel - pressurePoints; + + // We use wall clock time and not CPU time, because we also want to include time spent by I/O. + // If we're bottlenecked by writing the data to disk, this indicator should be low. + double relativeTimeSpentCompressing = (double) (1 + lastCompressionDuration) / (1 + currentTime - lastCompressionStartTime); + + // Some smoothing is needed to avoid changing level too fast due to performance hiccups + this.relativeTimeSpentCompressing.update(relativeTimeSpentCompressing); + + // If we're under pressure to write data fast, we need to decrease compression level. + // But we do that only if we're really spending significant amount of time doing compression. + if (compressionLevelTarget < currentCompressionLevel && this.relativeTimeSpentCompressing.get() > 0.1) + currentCompressionLevel--; + // If we're not under heavy write pressure, or we're spending very little time compressing data, + // we can increase the compression level and get some space savings at a low performance overhead: + else if (compressionLevelTarget > currentCompressionLevel || this.relativeTimeSpentCompressing.get() < 0.02) + currentCompressionLevel++; + + currentCompressionLevel = clampCompressionLevel(currentCompressionLevel); + return currentCompressionLevel; + } + + /** + * Must be called after compressing a chunk, + * so we can measure how much time we spend in compressing vs time spent not-compressing. + */ + public void recordCompressionDuration(long startTime, long endTime) + { + this.lastCompressionDuration = endTime - startTime; + this.lastCompressionStartTime = startTime; + } + + @VisibleForTesting + double getRelativeTimeSpentCompressing() + { + return this.relativeTimeSpentCompressing.get(); + } + } + + /** + * @param params user-provided configuration such as min/max compression level range + * @param writePressureSupplier returns a non-negative score determining the write load on the system which + * is used to control the desired compression level. Influences the compression + * level linearly: an inceease of pressure by 1 point causes the target + * compression level to be decreased by 1 point. Zero will select the + * maximum allowed compression level. + */ + @VisibleForTesting + AdaptiveCompressor(Params params, Supplier writePressureSupplier) + { + this.params = params; + this.state = new ThreadLocal<>(); + this.writePressureSupplier = writePressureSupplier; + } + + @Override + public int initialCompressedBufferLength(int chunkLength) + { + return (int) Zstd.compressBound(chunkLength); + } + + + @Override + public void compress(ByteBuffer input, ByteBuffer output) throws IOException + { + try + { + State state = getThreadLocalState(); + long startTime = System.nanoTime(); + int compressionLevel = zstdCompressionLevels[state.adjustAndGetCompressionLevel(startTime)]; + Zstd.compress(output, input, compressionLevel, true); + long endTime = System.nanoTime(); + state.recordCompressionDuration(startTime, endTime); + metrics.get(params.use).updateFrom(state); + } + catch (Exception e) + { + throw new IOException("Compression failed", e); + } + + } + + @Override + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException + { + long dsz = Zstd.decompressByteArray(output, outputOffset, output.length - outputOffset, + input, inputOffset, inputLength); + + if (Zstd.isError(dsz)) + throw new IOException(String.format("Decompression failed due to %s", Zstd.getErrorName(dsz))); + + return (int) dsz; + } + + @Override + public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException + { + try + { + Zstd.decompress(output, input); + } catch (Exception e) + { + throw new IOException("Decompression failed", e); + } + } + + @Override + public BufferType preferredBufferType() + { + return BufferType.OFF_HEAP; + } + + @Override + public Set recommendedUses() + { + return params.minCompressionLevel <= DEFAULT_MIN_FAST_COMPRESSION_LEVEL + ? EnumSet.of(Uses.GENERAL, Uses.FAST_COMPRESSION) + : EnumSet.of(params.use); + } + + @Override + public ICompressor forUse(Uses use) + { + if (use == params.use) + return this; + + switch (use) + { + case GENERAL: + return createForCompaction(params.minCompressionLevel, params.maxCompressionLevel, params.maxCompactionQueueLength); + case FAST_COMPRESSION: + return createForFlush(params.minCompressionLevel, params.maxCompressionLevel); + } + + return null; + } + + @Override + public boolean supports(BufferType bufferType) + { + return bufferType == BufferType.OFF_HEAP; + } + + @Override + public Set supportedOptions() + { + return Set.of("max_compression_level", "min_compression_level"); + } + + private static int getMinCompressionLevel(Uses mode, Map options) + { + int defaultValue = mode == Uses.FAST_COMPRESSION ? DEFAULT_MIN_FAST_COMPRESSION_LEVEL : DEFAULT_MIN_GENERAL_COMPRESSION_LEVEL; + return getIntOption(options, MIN_COMPRESSION_LEVEL_OPTION_NAME, defaultValue); + } + + private static int getMaxCompressionLevel(Uses mode, Map options) + { + var defaultValue = mode == Uses.FAST_COMPRESSION ? DEFAULT_MAX_FAST_COMPRESSION_LEVEL : DEFAULT_MAX_GENERAL_COMPRESSION_LEVEL; + return getIntOption(options, MAX_COMPRESSION_LEVEL_OPTION_NAME, defaultValue); + } + + private static int getMaxCompactionQueueLength(Map options) + { + return getIntOption(options, MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME, DEFAULT_MAX_COMPACTION_QUEUE_LENGTH); + } + + private static int getIntOption(Map options, String key, int defaultValue) + { + if (options == null) + return defaultValue; + + String val = options.get(key); + if (val == null) + return defaultValue; + + return Integer.parseInt(val); + } + + private double getWritePressure() + { + return writePressureSupplier.get(); + } + + private static double getFlushPressure() + { + var memoryPool = AbstractAllocatorMemtable.MEMORY_POOL; + var usedRatio = Math.max(memoryPool.onHeap.usedRatio(), memoryPool.offHeap.usedRatio()); + var cleanupThreshold = DatabaseDescriptor.getMemtableCleanupThreshold(); + // we max out the pressure when we're halfway between the cleanupThreshold and max memory + // so we still have some memory left while compression already working at max speed; + // setting the compressor to maximum speed when we exhausted all memory would be too late + return Math.min(1.0, Math.max(0.0, 2 * (usedRatio - cleanupThreshold)) / (1.0 - cleanupThreshold)); + } + + private static double getCompactionPressure(int maxCompactionQueueLength) + { + CompactionManager compactionManager = CompactionManager.instance; + long rateLimit = DatabaseDescriptor.getCompactionThroughputMbPerSec() * FileUtils.ONE_MB; + if (rateLimit == 0) + rateLimit = Long.MAX_VALUE; + double actualRate = compactionManager.getMetrics().bytesCompactedThroughput.getOneMinuteRate(); + // We don't want to speed up compression if we can keep up with the configured compression rate limit + // 0.0 if actualRate >= rateLimit + // 1.0 if actualRate <= 0.8 * rateLimit; + double rateLimitFactor = Math.min(1.0, Math.max(0.0, (rateLimit - actualRate) / (0.2 * rateLimit))); + + long pendingCompactions = compactionManager.getPendingTasks(); + long activeCompactions = compactionManager.getActiveCompactions(); + long queuedCompactions = pendingCompactions - activeCompactions; + double compactionQueuePressure = Math.min(1.0, (double) queuedCompactions / (maxCompactionQueueLength * DatabaseDescriptor.getConcurrentCompactors())); + return compactionQueuePressure * rateLimitFactor; + } + + private int clampCompressionLevel(long compressionLevel) + { + return (int) Math.min(params.maxCompressionLevel, Math.max(params.minCompressionLevel, compressionLevel)); + } + + @VisibleForTesting + State getThreadLocalState() + { + State state = this.state.get(); + if (state == null) + { + state = new State(); + state.currentCompressionLevel = params.maxCompressionLevel; + state.lastCompressionDuration = 0; + this.state.set(state); + } + return state; + } + + static class Metrics + { + private final Counter[] compressionLevelHistogram; // separate counters for each compression level + private final Histogram relativeTimeSpentCompressing; // in % (i.e. multiplied by 100 becaue Histogram can only keep integers) + + Metrics(Uses use) + { + MetricNameFactory factory = new DefaultNameFactory("AdaptiveCompression"); + + // cannot use Metrics.histogram for compression levels, because histograms do not handle negative numbers; + // also this histogram is small enough that storing all buckets is not a problem, but it gives + // much more information + compressionLevelHistogram = new Counter[MAX_COMPRESSION_LEVEL + 1]; + for (int i = 0; i < compressionLevelHistogram.length; i++) + { + CassandraMetricsRegistry.MetricName metricName = factory.createMetricName(String.format("CompressionLevel_%s_%02d", use.name(), i)); + compressionLevelHistogram[i] = CassandraMetricsRegistry.Metrics.counter(metricName); + } + + relativeTimeSpentCompressing = CassandraMetricsRegistry.Metrics.histogram(factory.createMetricName("RelativeTimeSpentCompressing_" + use.name()), true); + } + + void updateFrom(State state) + { + compressionLevelHistogram[state.currentCompressionLevel].inc(); + relativeTimeSpentCompressing.update((int)(state.getRelativeTimeSpentCompressing() * 100.0)); + } + } +} diff --git a/src/java/org/apache/cassandra/io/compress/ICompressor.java b/src/java/org/apache/cassandra/io/compress/ICompressor.java index fd6a104431b3..b6f8b2c50eea 100644 --- a/src/java/org/apache/cassandra/io/compress/ICompressor.java +++ b/src/java/org/apache/cassandra/io/compress/ICompressor.java @@ -83,4 +83,18 @@ default Set recommendedUses() { return ImmutableSet.copyOf(EnumSet.allOf(Uses.class)); } + + /** + * Returns the compressor configured for a particular use. + * Allows creating a compressor implementation that can handle multiple uses but requires different configurations + * adapted to a particular use. + *

+ * May return this object. + * May not modify this object. + * Should return null if the request cannot be satisfied. + */ + default ICompressor forUse(Uses use) + { + return recommendedUses().contains(use) ? this : null; + } } diff --git a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java index a8884831a9a8..4b0bbd96dda4 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -43,6 +44,7 @@ import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.guardrails.Guardrails; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.AdaptiveCompressor; import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.sstable.Component; @@ -153,13 +155,21 @@ public static CompressionParams compressionFor(final OperationType opType, Table case fast: if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION)) { - // The default compressor is generally fast (LZ4 with 16KiB block size) - compressionParams = CompressionParams.DEFAULT; + compressionParams = CompressionParams.FAST; + break; + } + // else fall through + case adaptive: + if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION)) + { + compressionParams = CompressionParams.FAST_ADAPTIVE; break; } // else fall through case table: default: + compressionParams = Optional.ofNullable(compressionParams.forUse(ICompressor.Uses.FAST_COMPRESSION)) + .orElse(compressionParams); break; } } diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java index 53760a91893e..22a07a97a582 100644 --- a/src/java/org/apache/cassandra/schema/CompressionParams.java +++ b/src/java/org/apache/cassandra/schema/CompressionParams.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ThreadLocalRandom; import com.google.common.annotations.VisibleForTesting; @@ -35,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ConfigurationException; @@ -66,12 +68,26 @@ public final class CompressionParams public static final String ENABLED = "enabled"; public static final String MIN_COMPRESS_RATIO = "min_compress_ratio"; - public static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.create(Collections.emptyMap()), + public static final CompressionParams FAST = new CompressionParams(LZ4Compressor.create(Collections.emptyMap()), DEFAULT_CHUNK_LENGTH, calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO), DEFAULT_MIN_COMPRESS_RATIO, Collections.emptyMap()); + public static final CompressionParams ADAPTIVE = new CompressionParams(AdaptiveCompressor.create(Collections.emptyMap()), + DEFAULT_CHUNK_LENGTH, + calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO), + DEFAULT_MIN_COMPRESS_RATIO, + Collections.emptyMap()); + + public static final CompressionParams FAST_ADAPTIVE = new CompressionParams(AdaptiveCompressor.createForFlush(Collections.emptyMap()), + DEFAULT_CHUNK_LENGTH, + calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO), + DEFAULT_MIN_COMPRESS_RATIO, + Collections.emptyMap()); + + public static final CompressionParams DEFAULT = DatabaseDescriptor.shouldUseAdaptiveCompressionByDefault() ? ADAPTIVE : FAST; + public static final CompressionParams NOOP = new CompressionParams(NoopCompressor.create(Collections.emptyMap()), // 4 KiB is often the underlying disk block size 1024 * 4, @@ -249,6 +265,24 @@ public boolean isEnabled() return sstableCompressor != null; } + /** + * Specializes the compressor for given use. + * May cause reconfiguration of parameters on some compressors. + * Returns null if params are not compatible with the given use. + */ + public CompressionParams forUse(ICompressor.Uses use) + { + ICompressor specializedCompressor = this.sstableCompressor.forUse(use); + if (specializedCompressor == null) + return null; + + assert specializedCompressor.recommendedUses().contains(use); + if (specializedCompressor == sstableCompressor) + return this; + + return new CompressionParams(specializedCompressor, chunkLength, maxCompressedLength, minCompressRatio, otherOptions); + } + /** * Returns the SSTable compressor. * @return the SSTable compressor or {@code null} if compression is disabled. diff --git a/test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java b/test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java new file mode 100644 index 000000000000..b8cf758e933d --- /dev/null +++ b/test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; + +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +import org.apache.cassandra.io.util.RandomAccessReader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class AdaptiveCompressorTest +{ + + @Test(expected = IllegalArgumentException.class) + public void badCompressionLevelParamThrowsExceptionMin() + { + AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MIN_COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(AdaptiveCompressor.MIN_COMPRESSION_LEVEL - 1))); + } + + @Test(expected = IllegalArgumentException.class) + public void badCompressionLevelParamThrowsExceptionMax() + { + AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MAX_COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(AdaptiveCompressor.MAX_COMPRESSION_LEVEL + 1))); + } + + @Test(expected = IllegalArgumentException.class) + public void badMaxCompactionQueueLengthParamThrowsExceptionMin() + { + AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME, "-1")); + } + + @Test + public void averageRelativeTimeCompressingIsMeasuredProperly() throws IOException, InterruptedException + { + var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 15, 15, 0); + AdaptiveCompressor c1 = new AdaptiveCompressor(params, () -> 0.0); + ByteBuffer src = getSrcByteBuffer(); + ByteBuffer dest = getDstByteBuffer(c1); + for (int i = 0; i < 20000; i++) + { + compress(c1, src, dest); + } + assertTrue(c1.getThreadLocalState().getRelativeTimeSpentCompressing() > 0.8); + assertTrue(c1.getThreadLocalState().getRelativeTimeSpentCompressing() < 1.0); + + + var params2 = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 0, 0, 0); + AdaptiveCompressor c2 = new AdaptiveCompressor(params2, () -> 0.0); + for (int i = 0; i < 100; i++) + { + Thread.sleep(1); + compress(c2, src, dest); + } + assertTrue(c2.getThreadLocalState().getRelativeTimeSpentCompressing() < 0.02); + assertTrue(c2.getThreadLocalState().getRelativeTimeSpentCompressing() > 0.0); + } + + @Test + public void compressionLevelAdaptsToWritePressure() throws IOException + { + var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 2, 8, 0); + double[] load = { 1.0 }; + + AdaptiveCompressor c = new AdaptiveCompressor(params, () -> load[0]); + ByteBuffer src = getSrcByteBuffer(); + ByteBuffer dest = getDstByteBuffer(c); + + for (int i = 0; i < 10; i++) + compress(c, src, dest); + + assertEquals(2, c.getThreadLocalState().currentCompressionLevel); + + // Low load; compression level must be increased back to max: + load[0] = 0L; + + for (int i = 0; i < 10; i++) + compress(c, src, dest); + + assertEquals(8, c.getThreadLocalState().currentCompressionLevel); + } + + @Test + public void compressionLevelDoesNotDecreaseWhenCompressionIsNotABottleneck() throws IOException, InterruptedException + { + var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 2, 8, 0); + // Simulate high write load + AdaptiveCompressor c = new AdaptiveCompressor(params, () -> 1.0); + ByteBuffer src = getSrcByteBuffer(); + ByteBuffer dest = getDstByteBuffer(c); + + for (int i = 0; i < 200; i++) + { + Thread.sleep(1); // creates artificial bottleneck that is much slower than compression + compress(c, src, dest); + } + + assertEquals(8, c.getThreadLocalState().currentCompressionLevel); + } + + private static ByteBuffer getDstByteBuffer(ICompressor compressor) + { + return ByteBuffer.allocateDirect(compressor.initialCompressedBufferLength(RandomAccessReader.DEFAULT_BUFFER_SIZE)); + } + + private static ByteBuffer getSrcByteBuffer() + { + int n = RandomAccessReader.DEFAULT_BUFFER_SIZE; + byte[] srcData = new byte[n]; + new Random().nextBytes(srcData); + + ByteBuffer src = ByteBuffer.allocateDirect(n); + src.put(srcData, 0, n); + src.flip().position(0); + return src; + } + + private static void compress(AdaptiveCompressor c, ByteBuffer src, ByteBuffer dest) throws IOException + { + c.compress(src, dest); + src.rewind(); + dest.rewind(); + } + +} diff --git a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java index a1e09ed77895..0f117095b0e6 100644 --- a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java @@ -174,6 +174,39 @@ public void zstdFlushTest() throws Throwable }); } + @Test + public void adaptiveFlushTest() throws Throwable + { + createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'AdaptiveCompressor'};"); + DatabaseDescriptor.setFlushCompression(Config.FlushCompression.fast); + ColumnFamilyStore store = flushTwice(); + + // Should flush as LZ4 + Set sstables = store.getLiveSSTables(); + sstables.forEach(sstable -> { + assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof LZ4Compressor); + }); + store.truncateBlocking(); + + DatabaseDescriptor.setFlushCompression(Config.FlushCompression.adaptive); + store = flushTwice(); + + // Should flush as Adaptive + sstables = store.getLiveSSTables(); + sstables.forEach(sstable -> { + assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof AdaptiveCompressor); + }); + + // Should compact to Adaptive + compact(); + + sstables = store.getLiveSSTables(); + assertEquals(1, sstables.size()); + store.getLiveSSTables().forEach(sstable -> { + assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof AdaptiveCompressor); + }); + } + @Test public void deflateFlushTest() throws Throwable { diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java index a3a64babfa7d..7cf37844892e 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java @@ -29,8 +29,10 @@ import com.google.common.io.Files; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; @@ -41,6 +43,12 @@ public class CompressorTest { + @BeforeClass + public static void initialize() + { + DatabaseDescriptor.daemonInitialization(); + } + ICompressor compressor; ICompressor[] compressors = new ICompressor[] { @@ -48,6 +56,7 @@ public class CompressorTest DeflateCompressor.create(Collections.emptyMap()), SnappyCompressor.create(Collections.emptyMap()), ZstdCompressor.create(Collections.emptyMap()), + AdaptiveCompressor.createForUnitTesting(), NoopCompressor.create(Collections.emptyMap()) }; @@ -190,6 +199,13 @@ public void testZstdByteBuffers() throws IOException testByteBuffers(); } + @Test + public void testAdaptiveByteBuffers() throws IOException + { + compressor = AdaptiveCompressor.createForUnitTesting(); + testByteBuffers(); + } + @Test public void testNoopByteBuffers() throws IOException { From e6baaeefe109ead166f5efa9ea6500d65d649416 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Wed, 18 Dec 2024 18:08:46 +0100 Subject: [PATCH 3/3] Reuse ZStd compression/decompression context Reduces some overhead of setting up / tearing down those contexts that happened inside the calls to Zstd.compress / Zstd.decompress. Makes a difference with very small chunks. Additionally, added some compression/decompression rate metrics. --- .../io/compress/AdaptiveCompressor.java | 38 ++++++++++++++----- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java b/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java index e83a2c859209..cc14c0aebeab 100644 --- a/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java +++ b/src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java @@ -32,7 +32,10 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdCompressCtx; +import com.github.luben.zstd.ZstdDecompressCtx; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.memtable.AbstractAllocatorMemtable; @@ -79,6 +82,7 @@ Uses.GENERAL, new Metrics(Uses.GENERAL) protected static final String MAX_COMPRESSION_LEVEL_OPTION_NAME = "max_compression_level"; protected static final String MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME = "max_compaction_queue_length"; + /** * Maps AdaptiveCompressor compression level to underlying ZStandard compression levels. * This mapping is needed because ZStandard levels are not continuous, zstd level 0 is special and means level 3. @@ -156,7 +160,6 @@ private static AdaptiveCompressor createForFlush(int minCompressionLevel, int ma private final ThreadLocal state; private final Supplier writePressureSupplier; - static class Params { final Uses use; @@ -205,6 +208,9 @@ public int hashCode() */ class State { + final ZstdCompressCtx compressCtx = new ZstdCompressCtx().setChecksum(true); + final ZstdDecompressCtx decompressCtx = new ZstdDecompressCtx(); + /** * ZStandard compression level that was used when compressing the previous chunk. * Can be adjusted up or down by at most 1 with every next block. @@ -229,7 +235,7 @@ class State /** * Computes the new compression level to use for the next chunk, based on the load. */ - public int adjustAndGetCompressionLevel(long currentTime) + public void adjustCompressionLevel(long currentTime) { // The more write "pressure", the faster we want to go, so the lower the desired compression level. double pressure = getWritePressure(); @@ -256,7 +262,7 @@ else if (compressionLevelTarget > currentCompressionLevel || this.relativeTimeSp currentCompressionLevel++; currentCompressionLevel = clampCompressionLevel(currentCompressionLevel); - return currentCompressionLevel; + compressCtx.setLevel(zstdCompressionLevels[currentCompressionLevel]); } /** @@ -306,11 +312,15 @@ public void compress(ByteBuffer input, ByteBuffer output) throws IOException { State state = getThreadLocalState(); long startTime = System.nanoTime(); - int compressionLevel = zstdCompressionLevels[state.adjustAndGetCompressionLevel(startTime)]; - Zstd.compress(output, input, compressionLevel, true); + state.adjustCompressionLevel(startTime); + long inputSize = input.remaining(); + state.compressCtx.compress(output, input); long endTime = System.nanoTime(); state.recordCompressionDuration(startTime, endTime); - metrics.get(params.use).updateFrom(state); + + Metrics m = metrics.get(params.use); + m.updateFrom(state); + m.compressionRate.mark(inputSize); } catch (Exception e) { @@ -322,12 +332,14 @@ public void compress(ByteBuffer input, ByteBuffer output) throws IOException @Override public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException { - long dsz = Zstd.decompressByteArray(output, outputOffset, output.length - outputOffset, - input, inputOffset, inputLength); + State state = getThreadLocalState(); + long dsz = state.decompressCtx.decompressByteArray(output, outputOffset, output.length - outputOffset, + input, inputOffset, inputLength); if (Zstd.isError(dsz)) throw new IOException(String.format("Decompression failed due to %s", Zstd.getErrorName(dsz))); + metrics.get(params.use).decompressionRate.mark(dsz); return (int) dsz; } @@ -336,7 +348,9 @@ public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException { try { - Zstd.decompress(output, input); + State state = getThreadLocalState(); + long dsz = state.decompressCtx.decompress(output, input); + metrics.get(params.use).decompressionRate.mark(dsz); } catch (Exception e) { throw new IOException("Decompression failed", e); @@ -473,6 +487,9 @@ static class Metrics { private final Counter[] compressionLevelHistogram; // separate counters for each compression level private final Histogram relativeTimeSpentCompressing; // in % (i.e. multiplied by 100 becaue Histogram can only keep integers) + private final Meter compressionRate; + private final Meter decompressionRate; + Metrics(Uses use) { @@ -489,6 +506,9 @@ static class Metrics } relativeTimeSpentCompressing = CassandraMetricsRegistry.Metrics.histogram(factory.createMetricName("RelativeTimeSpentCompressing_" + use.name()), true); + + compressionRate = CassandraMetricsRegistry.Metrics.meter(factory.createMetricName("CompressionRate_" + use.name())); + decompressionRate = CassandraMetricsRegistry.Metrics.meter(factory.createMetricName("DecompressionRate_" + use.name())); } void updateFrom(State state)