forked from apache/cassandra
-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This commit introduces a new AdaptiveCompressor class. AdaptiveCompressor uses ZStandard compression with a dynamic compression level based on the current write load. AdaptiveCompressor's goal is to provide similar write performance as LZ4Compressor for write heavy workloads, but a significantly better compression ratio for databases with a moderate amount of writes or on systems with a lot of spare CPU power. If the memtable flush queue builds up, and it turns out the compression is a significant bottleneck, then the compression level used for flushing is decreased to gain speed. Similarly, when pending compaction tasks build up, then the compression level used for compaction is decreased. In order to enable adaptive compression: - set `-Dcassandra.default_sstable_compression=adaptive` JVM option to automatically select `AdaptiveCompressor` as the main compressor for flushes and new tables, if not overriden by specific options in cassandra.yaml or table schema - set `flush_compression: adaptive` in cassandra.yaml to enable it for flushing - set `AdaptiveCompressor` in Table options to enable it for compaction Caution: this feature is not turned on by default because it may impact read speed negatively in some rare cases. Fixes riptano/cndb#11532
- Loading branch information
Showing
10 changed files
with
754 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
481 changes: 481 additions & 0 deletions
481
src/java/org/apache/cassandra/io/compress/AdaptiveCompressor.java
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
147 changes: 147 additions & 0 deletions
147
test/unit/org/apache/cassandra/io/compress/AdaptiveCompressorTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.cassandra.io.compress; | ||
|
||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.util.Random; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import org.junit.Test; | ||
|
||
import org.apache.cassandra.io.util.RandomAccessReader; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertTrue; | ||
|
||
|
||
public class AdaptiveCompressorTest | ||
{ | ||
|
||
@Test(expected = IllegalArgumentException.class) | ||
public void badCompressionLevelParamThrowsExceptionMin() | ||
{ | ||
AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MIN_COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(AdaptiveCompressor.MIN_COMPRESSION_LEVEL - 1))); | ||
} | ||
|
||
@Test(expected = IllegalArgumentException.class) | ||
public void badCompressionLevelParamThrowsExceptionMax() | ||
{ | ||
AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MAX_COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(AdaptiveCompressor.MAX_COMPRESSION_LEVEL + 1))); | ||
} | ||
|
||
@Test(expected = IllegalArgumentException.class) | ||
public void badMaxCompactionQueueLengthParamThrowsExceptionMin() | ||
{ | ||
AdaptiveCompressor.create(ImmutableMap.of(AdaptiveCompressor.MAX_COMPACTION_QUEUE_LENGTH_OPTION_NAME, "-1")); | ||
} | ||
|
||
@Test | ||
public void averageRelativeTimeCompressingIsMeasuredProperly() throws IOException, InterruptedException | ||
{ | ||
var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 15, 15, 0); | ||
AdaptiveCompressor c1 = new AdaptiveCompressor(params, () -> 0.0); | ||
ByteBuffer src = getSrcByteBuffer(); | ||
ByteBuffer dest = getDstByteBuffer(c1); | ||
for (int i = 0; i < 20000; i++) | ||
{ | ||
compress(c1, src, dest); | ||
} | ||
assertTrue(c1.getThreadLocalState().getRelativeTimeSpentCompressing() > 0.8); | ||
assertTrue(c1.getThreadLocalState().getRelativeTimeSpentCompressing() < 1.0); | ||
|
||
|
||
var params2 = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 0, 0, 0); | ||
AdaptiveCompressor c2 = new AdaptiveCompressor(params2, () -> 0.0); | ||
for (int i = 0; i < 100; i++) | ||
{ | ||
Thread.sleep(1); | ||
compress(c2, src, dest); | ||
} | ||
assertTrue(c2.getThreadLocalState().getRelativeTimeSpentCompressing() < 0.02); | ||
assertTrue(c2.getThreadLocalState().getRelativeTimeSpentCompressing() > 0.0); | ||
} | ||
|
||
@Test | ||
public void compressionLevelAdaptsToWritePressure() throws IOException | ||
{ | ||
var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 2, 8, 0); | ||
double[] load = { 1.0 }; | ||
|
||
AdaptiveCompressor c = new AdaptiveCompressor(params, () -> load[0]); | ||
ByteBuffer src = getSrcByteBuffer(); | ||
ByteBuffer dest = getDstByteBuffer(c); | ||
|
||
for (int i = 0; i < 10; i++) | ||
compress(c, src, dest); | ||
|
||
assertEquals(2, c.getThreadLocalState().currentCompressionLevel); | ||
|
||
// Low load; compression level must be increased back to max: | ||
load[0] = 0L; | ||
|
||
for (int i = 0; i < 10; i++) | ||
compress(c, src, dest); | ||
|
||
assertEquals(8, c.getThreadLocalState().currentCompressionLevel); | ||
} | ||
|
||
@Test | ||
public void compressionLevelDoesNotDecreaseWhenCompressionIsNotABottleneck() throws IOException, InterruptedException | ||
{ | ||
var params = new AdaptiveCompressor.Params(ICompressor.Uses.GENERAL, 2, 8, 0); | ||
// Simulate high write load | ||
AdaptiveCompressor c = new AdaptiveCompressor(params, () -> 1.0); | ||
ByteBuffer src = getSrcByteBuffer(); | ||
ByteBuffer dest = getDstByteBuffer(c); | ||
|
||
for (int i = 0; i < 200; i++) | ||
{ | ||
Thread.sleep(1); // creates artificial bottleneck that is much slower than compression | ||
compress(c, src, dest); | ||
} | ||
|
||
assertEquals(8, c.getThreadLocalState().currentCompressionLevel); | ||
} | ||
|
||
private static ByteBuffer getDstByteBuffer(ICompressor compressor) | ||
{ | ||
return ByteBuffer.allocateDirect(compressor.initialCompressedBufferLength(RandomAccessReader.DEFAULT_BUFFER_SIZE)); | ||
} | ||
|
||
private static ByteBuffer getSrcByteBuffer() | ||
{ | ||
int n = RandomAccessReader.DEFAULT_BUFFER_SIZE; | ||
byte[] srcData = new byte[n]; | ||
new Random().nextBytes(srcData); | ||
|
||
ByteBuffer src = ByteBuffer.allocateDirect(n); | ||
src.put(srcData, 0, n); | ||
src.flip().position(0); | ||
return src; | ||
} | ||
|
||
private static void compress(AdaptiveCompressor c, ByteBuffer src, ByteBuffer dest) throws IOException | ||
{ | ||
c.compress(src, dest); | ||
src.rewind(); | ||
dest.rewind(); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.