Skip to content

Commit

Permalink
CNDB-11532: Adaptive compression
Browse files Browse the repository at this point in the history
This commit introduces a new AdaptiveCompressor class.

AdaptiveCompressor uses ZStandard compression with a dynamic
compression level based on the current write load. AdaptiveCompressor's
goal is to provide similar write performance as LZ4Compressor
for write heavy workloads, but a significantly better compression ratio
for databases with a moderate amount of writes or on systems
with a lot of spare CPU power.

If the memtable flush queue builds up, and it turns out the compression
is a significant bottleneck, then the compression level used for
flushing is decreased to gain speed. Similarly, when pending
compaction tasks build up, then the compression level used
for compaction is decreased.

In order to enable adaptive compression:
  - set `-Dcassandra.default_sstable_compression=adaptive` JVM option
    to automatically select `AdaptiveCompressor` as the main compressor
    for flushes and new tables, if not overriden by specific options in
    cassandra.yaml or table schema
  - set `flush_compression: adaptive` in cassandra.yaml to enable it
    for flushing
  - set `AdaptiveCompressor` in Table options to enable it
    for compaction

Caution: this feature is not turned on by default because it
may impact read speed negatively in some rare cases.

Fixes riptano/cndb#11532
  • Loading branch information
pkolaczk committed Dec 3, 2024
1 parent d600e32 commit 9efb5fa
Show file tree
Hide file tree
Showing 10 changed files with 773 additions and 6 deletions.
2 changes: 2 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,8 @@ commitlog_segment_size_in_mb: 32
# none : Flush without compressing blocks but while still doing checksums.
# fast : Flush with a fast compressor. If the table is already using a
# fast compressor that compressor is used.
# adaptive : Flush with a fast adaptive compressor. If the table is already using a
# fast compressor that compressor is used.
# table: Always flush with the same compressor that the table uses. This
# was the pre 4.0 behavior.
#
Expand Down
6 changes: 5 additions & 1 deletion src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -37,6 +38,7 @@
import org.apache.cassandra.fql.FullQueryLoggerOptions;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.guardrails.GuardrailsConfig;
import org.apache.cassandra.io.compress.AdaptiveCompressor;
import org.apache.cassandra.utils.FBUtilities;

/**
Expand Down Expand Up @@ -290,8 +292,9 @@ public class Config
public double commitlog_sync_group_window_in_ms = Double.NaN;
public int commitlog_sync_period_in_ms;
public int commitlog_segment_size_in_mb = 32;

public ParameterizedClass commitlog_compression;
public FlushCompression flush_compression = FlushCompression.fast;
public FlushCompression flush_compression;
public int commitlog_max_compression_buffers_in_pool = 3;
public Integer periodic_commitlog_sync_lag_block_in_ms;
public TransparentDataEncryptionOptions transparent_data_encryption_options = new TransparentDataEncryptionOptions();
Expand Down Expand Up @@ -662,6 +665,7 @@ public enum FlushCompression
{
none,
fast,
adaptive,
table
}

Expand Down
11 changes: 9 additions & 2 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2081,15 +2081,22 @@ public static void setCommitLogCompression(ParameterizedClass compressor)

public static Config.FlushCompression getFlushCompression()
{
return conf.flush_compression;
return Objects.requireNonNullElseGet(conf.flush_compression, () -> shouldUseAdaptiveCompressionByDefault()
? Config.FlushCompression.adaptive
: Config.FlushCompression.fast);
}

public static void setFlushCompression(Config.FlushCompression compression)
{
conf.flush_compression = compression;
}

/**
public static boolean shouldUseAdaptiveCompressionByDefault()
{
return System.getProperty("cassandra.default_sstable_compression", "fast").equals("adaptive");
}

/**
* Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
* (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
* more, depending on how soon the sync policy stops all writing threads.
Expand Down
500 changes: 500 additions & 0 deletions src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions src/java/org/apache/cassandra/io/compress/ICompressor.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,18 @@ default Set<Uses> recommendedUses()
{
return ImmutableSet.copyOf(EnumSet.allOf(Uses.class));
}

/**
* Returns the compressor configured for a particular use.
* Allows creating a compressor implementation that can handle multiple uses but requires different configurations
* adapted to a particular use.
* <p>
* May return this object.
* May not modify this object.
* Should return null if the request cannot be satisfied.
*/
default ICompressor forUse(Uses use)
{
return recommendedUses().contains(use) ? this : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;

Expand All @@ -43,6 +44,7 @@
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.guardrails.Guardrails;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.AdaptiveCompressor;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.sstable.Component;
Expand Down Expand Up @@ -153,13 +155,21 @@ public static CompressionParams compressionFor(final OperationType opType, Table
case fast:
if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION))
{
// The default compressor is generally fast (LZ4 with 16KiB block size)
compressionParams = CompressionParams.DEFAULT;
compressionParams = CompressionParams.FAST;
break;
}
// else fall through
case adaptive:
if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION))
{
compressionParams = CompressionParams.FAST_ADAPTIVE;
break;
}
// else fall through
case table:
default:
compressionParams = Optional.ofNullable(compressionParams.forUse(ICompressor.Uses.FAST_COMPRESSION))
.orElse(compressionParams);
break;
}
}
Expand Down
36 changes: 35 additions & 1 deletion src/java/org/apache/cassandra/schema/CompressionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -35,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.exceptions.ConfigurationException;
Expand Down Expand Up @@ -66,12 +68,26 @@ public final class CompressionParams
public static final String ENABLED = "enabled";
public static final String MIN_COMPRESS_RATIO = "min_compress_ratio";

public static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.create(Collections.<String, String>emptyMap()),
public static final CompressionParams FAST = new CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
DEFAULT_CHUNK_LENGTH,
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
DEFAULT_MIN_COMPRESS_RATIO,
Collections.emptyMap());

public static final CompressionParams ADAPTIVE = new CompressionParams(AdaptiveCompressor.create(Collections.emptyMap()),
DEFAULT_CHUNK_LENGTH,
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
DEFAULT_MIN_COMPRESS_RATIO,
Collections.emptyMap());

public static final CompressionParams FAST_ADAPTIVE = new CompressionParams(AdaptiveCompressor.createForFlush(Collections.emptyMap()),
DEFAULT_CHUNK_LENGTH,
calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
DEFAULT_MIN_COMPRESS_RATIO,
Collections.emptyMap());

public static final CompressionParams DEFAULT = DatabaseDescriptor.shouldUseAdaptiveCompressionByDefault() ? ADAPTIVE : FAST;

public static final CompressionParams NOOP = new CompressionParams(NoopCompressor.create(Collections.emptyMap()),
// 4 KiB is often the underlying disk block size
1024 * 4,
Expand Down Expand Up @@ -249,6 +265,24 @@ public boolean isEnabled()
return sstableCompressor != null;
}

/**
* Specializes the compressor for given use.
* May cause reconfiguration of parameters on some compressors.
* Returns null if params are not compatible with the given use.
*/
public CompressionParams forUse(ICompressor.Uses use)
{
ICompressor specializedCompressor = this.sstableCompressor.forUse(use);
if (specializedCompressor == null)
return null;

assert specializedCompressor.recommendedUses().contains(use);
if (specializedCompressor == sstableCompressor)
return this;

return new CompressionParams(specializedCompressor, chunkLength, maxCompressedLength, minCompressRatio, otherOptions);
}

/**
* Returns the SSTable compressor.
* @return the SSTable compressor or {@code null} if compression is disabled.
Expand Down
147 changes: 147 additions & 0 deletions test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.io.compress;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;

import com.google.common.collect.ImmutableMap;
import org.junit.Test;

import org.apache.cassandra.io.util.RandomAccessReader;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;


public class AdaptiveCompressorTest
{

@Test(expected = IllegalArgumentException.class)
public void badCompressionLevelParamThrowsExceptionMin()
{
AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MIN_COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(AdaptiveCompressor.MIN_COMPRESSION_LEVEL - 1)));
}

@Test(expected = IllegalArgumentException.class)
public void badCompressionLevelParamThrowsExceptionMax()
{
AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MAX_COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(AdaptiveCompressor.MAX_COMPRESSION_LEVEL + 1)));
}

@Test(expected = IllegalArgumentException.class)
public void badMaxCompactionQueueLengthParamThrowsExceptionMin()
{
AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME, "-1"));
}

@Test
public void averageRelativeTimeCompressingIsMeasuredProperly() throws IOException, InterruptedException
{
var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 15, 15, 0);
AdaptiveCompressor c1 = new AdaptiveCompressor(params, () -> 0.0);
ByteBuffer src = getSrcByteBuffer();
ByteBuffer dest = getDstByteBuffer(c1);
for (int i = 0; i < 20000; i++)
{
compress(c1, src, dest);
}
assertTrue(c1.getThreadLocalState().getRelativeTimeSpentCompressing() > 0.8);
assertTrue(c1.getThreadLocalState().getRelativeTimeSpentCompressing() < 1.0);


var params2 = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 0, 0, 0);
AdaptiveCompressor c2 = new AdaptiveCompressor(params2, () -> 0.0);
for (int i = 0; i < 100; i++)
{
Thread.sleep(1);
compress(c2, src, dest);
}
assertTrue(c2.getThreadLocalState().getRelativeTimeSpentCompressing() < 0.02);
assertTrue(c2.getThreadLocalState().getRelativeTimeSpentCompressing() > 0.0);
}

@Test
public void compressionLevelAdaptsToWritePressure() throws IOException
{
var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 2, 8, 0);
double[] load = { 1.0 };

AdaptiveCompressor c = new AdaptiveCompressor(params, () -> load[0]);
ByteBuffer src = getSrcByteBuffer();
ByteBuffer dest = getDstByteBuffer(c);

for (int i = 0; i < 10; i++)
compress(c, src, dest);

assertEquals(2, c.getThreadLocalState().currentCompressionLevel);

// Low load; compression level must be increased back to max:
load[0] = 0L;

for (int i = 0; i < 10; i++)
compress(c, src, dest);

assertEquals(8, c.getThreadLocalState().currentCompressionLevel);
}

@Test
public void compressionLevelDoesNotDecreaseWhenCompressionIsNotABottleneck() throws IOException, InterruptedException
{
var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 2, 8, 0);
// Simulate high write load
AdaptiveCompressor c = new AdaptiveCompressor(params, () -> 1.0);
ByteBuffer src = getSrcByteBuffer();
ByteBuffer dest = getDstByteBuffer(c);

for (int i = 0; i < 200; i++)
{
Thread.sleep(1); // creates artificial bottleneck that is much slower than compression
compress(c, src, dest);
}

assertEquals(8, c.getThreadLocalState().currentCompressionLevel);
}

private static ByteBuffer getDstByteBuffer(ICompressor compressor)
{
return ByteBuffer.allocateDirect(compressor.initialCompressedBufferLength(RandomAccessReader.DEFAULT_BUFFER_SIZE));
}

private static ByteBuffer getSrcByteBuffer()
{
int n = RandomAccessReader.DEFAULT_BUFFER_SIZE;
byte[] srcData = new byte[n];
new Random().nextBytes(srcData);

ByteBuffer src = ByteBuffer.allocateDirect(n);
src.put(srcData, 0, n);
src.flip().position(0);
return src;
}

private static void compress(AdaptiveCompressor c, ByteBuffer src, ByteBuffer dest) throws IOException
{
c.compress(src, dest);
src.rewind();
dest.rewind();
}

}
33 changes: 33 additions & 0 deletions test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,39 @@ public void zstdFlushTest() throws Throwable
});
}

@Test
public void adaptiveFlushTest() throws Throwable
{
createTable("CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = {'sstable_compression': 'AdaptiveCompressor'};");
DatabaseDescriptor.setFlushCompression(Config.FlushCompression.fast);
ColumnFamilyStore store = flushTwice();

// Should flush as LZ4
Set<SSTableReader> sstables = store.getLiveSSTables();
sstables.forEach(sstable -> {
assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof LZ4Compressor);
});
store.truncateBlocking();

DatabaseDescriptor.setFlushCompression(Config.FlushCompression.adaptive);
store = flushTwice();

// Should flush as Adaptive
sstables = store.getLiveSSTables();
sstables.forEach(sstable -> {
assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof AdaptiveCompressor);
});

// Should compact to Adaptive
compact();

sstables = store.getLiveSSTables();
assertEquals(1, sstables.size());
store.getLiveSSTables().forEach(sstable -> {
assertTrue(sstable.getCompressionMetadata().parameters.getSstableCompressor() instanceof AdaptiveCompressor);
});
}

@Test
public void deflateFlushTest() throws Throwable
{
Expand Down
Loading

0 comments on commit 9efb5fa

Please sign in to comment.