Skip to content

Commit

Permalink
[server] Tuned zstd compression level in ingestion (#1322)
Browse files Browse the repository at this point in the history
Previously, Venice Server is always using the max compression level(22)
to compress the payload, which is very slow and this PR will change
the compression level to the default level (3) by default, and the
compression speedup is roughly 10-20x and it might introduce
some more storage overhead, 5-10%, but it should be fine.

This PR doesn't change the compression level in batch push as it is
offline, and if we see a need to speed up the map-reduce phase, we
can do the same later.

New Server Config:
server.zstd.dict.compression.level: default 3

New metric:
leader_compress_latency
  • Loading branch information
gaojieliu authored Nov 19, 2024
1 parent 1fc6e85 commit 92de3d3
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 10 deletions.
1 change: 1 addition & 0 deletions clients/da-vinci-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies {
implementation libraries.kafkaClients
implementation libraries.rocksdbjni
implementation libraries.zkclient // It's necessary to pull in the most recent version of zkclient explicitly, otherwise Helix won't have it...
implementation libraries.zstd

testImplementation project(':internal:venice-test-common')
testImplementation project(':internal:venice-client-common').sourceSets.test.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ public class VersionBackend {
backend.getConfigLoader().getCombinedProperties().getInt(SERVER_STOP_CONSUMPTION_TIMEOUT_IN_SECONDS, 60);
this.storeDeserializerCache = backend.getStoreOrThrow(store.getName()).getStoreDeserializerCache();
this.compressor = Lazy.of(
() -> backend.getCompressorFactory().getCompressor(version.getCompressionStrategy(), version.kafkaTopicName()));
() -> backend.getCompressorFactory()
.getCompressor(
version.getCompressionStrategy(),
version.kafkaTopicName(),
config.getZstdDictCompressionLevel()));
backend.getVersionByTopicMap().put(version.kafkaTopicName(), this);
long daVinciPushStatusCheckIntervalInMs = this.config.getDaVinciPushStatusCheckIntervalInMs();
if (daVinciPushStatusCheckIntervalInMs >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.utils.ByteUtils;
import java.nio.ByteBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class StorageEngineBackedCompressorFactory extends CompressorFactory {
private static final Logger LOGGER = LogManager.getLogger(StorageEngineBackedCompressorFactory.class);
private final StorageMetadataService metadataService;

public StorageEngineBackedCompressorFactory(StorageMetadataService metadataService) {
this.metadataService = metadataService;
}

public VeniceCompressor getCompressor(CompressionStrategy compressionStrategy, String kafkaTopic) {
public VeniceCompressor getCompressor(
CompressionStrategy compressionStrategy,
String kafkaTopic,
int dictCompressionLevel) {
if (ZSTD_WITH_DICT.equals(compressionStrategy)) {
VeniceCompressor compressor = getVersionSpecificCompressor(kafkaTopic);
if (compressor != null) {
Expand All @@ -28,10 +34,12 @@ public VeniceCompressor getCompressor(CompressionStrategy compressionStrategy, S
if (dictionary == null) {
throw new IllegalStateException("Got a null dictionary for: " + kafkaTopic);
}
LOGGER.info("Creating a dict compressor with dict level: {} for topic: {}", dictCompressionLevel, kafkaTopic);
return super.createVersionSpecificCompressorIfNotExist(
compressionStrategy,
kafkaTopic,
ByteUtils.extractByteArray(dictionary));
ByteUtils.extractByteArray(dictionary),
dictCompressionLevel);
} else {
return getCompressor(compressionStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_SYSTEM_STORE_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH;
import static com.linkedin.venice.ConfigKeys.SERVER_ZSTD_DICT_COMPRESSION_LEVEL;
import static com.linkedin.venice.ConfigKeys.SEVER_CALCULATE_QUOTA_USAGE_BASED_ON_PARTITIONS_ASSIGNMENT_ENABLED;
import static com.linkedin.venice.ConfigKeys.SORTED_INPUT_DRAINER_SIZE;
import static com.linkedin.venice.ConfigKeys.STORE_WRITER_BUFFER_AFTER_LEADER_LOGIC_ENABLED;
Expand All @@ -168,6 +169,7 @@
import static com.linkedin.venice.utils.ByteUtils.BYTES_PER_MB;
import static com.linkedin.venice.utils.ByteUtils.generateHumanReadableByteCountString;

import com.github.luben.zstd.Zstd;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModelFactory;
import com.linkedin.davinci.kafka.consumer.KafkaConsumerService;
import com.linkedin.davinci.kafka.consumer.KafkaConsumerServiceDelegator;
Expand Down Expand Up @@ -555,6 +557,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final int aaWCWorkloadParallelProcessingThreadPoolSize;
private final boolean isGlobalRtDivEnabled;
private final boolean nearlineWorkloadProducerThroughputOptimizationEnabled;
private final int zstdDictCompressionLevel;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
Expand Down Expand Up @@ -929,6 +932,14 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getInt(SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_THREAD_POOL_SIZE, 8);
nearlineWorkloadProducerThroughputOptimizationEnabled =
serverProperties.getBoolean(SERVER_NEARLINE_WORKLOAD_PRODUCER_THROUGHPUT_OPTIMIZATION_ENABLED, true);
zstdDictCompressionLevel =
serverProperties.getInt(SERVER_ZSTD_DICT_COMPRESSION_LEVEL, Zstd.defaultCompressionLevel());
if (zstdDictCompressionLevel < Zstd.minCompressionLevel()
|| zstdDictCompressionLevel > Zstd.maxCompressionLevel()) {
throw new VeniceException(
"Invalid zstd dict compression level: " + zstdDictCompressionLevel + " should be between "
+ Zstd.minCompressionLevel() + " and " + Zstd.maxCompressionLevel());
}
}

long extractIngestionMemoryLimit(
Expand Down Expand Up @@ -1690,4 +1701,8 @@ public boolean isGlobalRtDivEnabled() {
public boolean isNearlineWorkloadProducerThroughputOptimizationEnabled() {
return nearlineWorkloadProducerThroughputOptimizationEnabled;
}

public int getZstdDictCompressionLevel() {
return zstdDictCompressionLevel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3111,8 +3111,11 @@ protected ByteBuffer maybeCompressData(
}
if (shouldCompressData(partitionConsumptionState)) {
try {
long startTimeInNS = System.nanoTime();
// We need to expand the front of the returned bytebuffer to make room for schema header insertion
return compressor.get().compress(data, ByteUtils.SIZE_OF_INT);
ByteBuffer result = compressor.get().compress(data, ByteUtils.SIZE_OF_INT);
hostLevelIngestionStats.recordLeaderCompressLatency(LatencyUtils.getElapsedTimeFromNSToMS(startTimeInNS));
return result;
} catch (IOException e) {
// throw a loud exception if something goes wrong here
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,9 @@ public StoreIngestionTask(
this.localKafkaClusterId = kafkaClusterUrlToIdMap.getOrDefault(localKafkaServer, Integer.MIN_VALUE);
this.compressionStrategy = version.getCompressionStrategy();
this.compressorFactory = builder.getCompressorFactory();
this.compressor = Lazy.of(() -> compressorFactory.getCompressor(compressionStrategy, kafkaVersionTopic));
this.compressor = Lazy.of(
() -> compressorFactory
.getCompressor(compressionStrategy, kafkaVersionTopic, serverConfig.getZstdDictCompressionLevel()));
this.isChunked = version.isChunkingEnabled();
this.isRmdChunked = version.isRmdChunkingEnabled();
this.manifestSerializer = new ChunkedValueManifestSerializer(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public class HostLevelIngestionStats extends AbstractVeniceStats {
private final LongAdderRateGauge totalTombstoneCreationDCRRate;

private final Sensor leaderProduceLatencySensor;
private final Sensor leaderCompressLatencySensor;
private final LongAdderRateGauge batchProcessingRequestSensor;
private final Sensor batchProcessingRequestSizeSensor;
private final LongAdderRateGauge batchProcessingRequestRecordsSensor;
Expand Down Expand Up @@ -454,6 +455,11 @@ public HostLevelIngestionStats(
totalStats,
() -> totalStats.leaderProduceLatencySensor,
avgAndMax());
this.leaderCompressLatencySensor = registerPerStoreAndTotalSensor(
"leader_compress_latency",
totalStats,
() -> totalStats.leaderCompressLatencySensor,
avgAndMax());
this.batchProcessingRequestSensor = registerOnlyTotalRate(
BATCH_PROCESSING_REQUEST,
totalStats,
Expand Down Expand Up @@ -663,6 +669,10 @@ public void recordLeaderProduceLatency(double latency) {
leaderProduceLatencySensor.record(latency);
}

public void recordLeaderCompressLatency(double latency) {
leaderCompressLatencySensor.record(latency);
}

public void recordBatchProcessingRequest(int size) {
batchProcessingRequestSensor.record();
batchProcessingRequestRecordsSensor.record(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;

import com.github.luben.zstd.Zstd;
import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.listener.response.NoOpReadResponseStats;
import com.linkedin.davinci.storage.StorageMetadataService;
Expand Down Expand Up @@ -225,8 +226,10 @@ private void runTest(
int readerSchemaId = schemaEntry.getId();
try (StorageEngineBackedCompressorFactory compressorFactory =
new StorageEngineBackedCompressorFactory(mock(StorageMetadataService.class))) {
VeniceCompressor compressor =
compressorFactory.getCompressor(CompressionStrategy.NO_OP, storageEngine.getStoreVersionName());
VeniceCompressor compressor = compressorFactory.getCompressor(
CompressionStrategy.NO_OP,
storageEngine.getStoreVersionName(),
Zstd.defaultCompressionLevel());
Object retrievedObject;
if (getWithSchemaId) {
retrievedObject = chunkingAdapter.getWithSchemaId(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2326,4 +2326,6 @@ private ConfigKeys() {
*/
public static final String SERVER_NEARLINE_WORKLOAD_PRODUCER_THROUGHPUT_OPTIMIZATION_ENABLED =
"server.nearline.workload.producer.throughput.optimization.enabled";

public static final String SERVER_ZSTD_DICT_COMPRESSION_LEVEL = "server.zstd.dict.compression.level";
}
Original file line number Diff line number Diff line change
Expand Up @@ -690,8 +690,10 @@ private static class ComputeRequestContext extends RequestContext {
this.valueSchemaEntry = handler.getComputeValueSchema(request);
this.resultSchema = handler.getComputeResultSchema(request.getComputeRequest(), valueSchemaEntry.getSchema());
this.resultSerializer = handler.genericSerializerGetter.apply(resultSchema);
this.compressor = handler.compressorFactory
.getCompressor(storeVersion.storageEngine.getCompressionStrategy(), request.getResourceName());
this.compressor = handler.compressorFactory.getCompressor(
storeVersion.storageEngine.getCompressionStrategy(),
request.getResourceName(),
handler.serverConfig.getZstdDictCompressionLevel());
this.operations = request.getComputeRequest().getOperations();
this.operationResultFields = ComputeUtils.getOperationResultFields(operations, resultSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void setUp() {
doReturn(partitionerConfig).when(version).getPartitionerConfig();

doReturn(storageEngine).when(storageEngineRepository).getLocalStorageEngine(any());
doReturn(new NoopCompressor()).when(compressorFactory).getCompressor(any(), any());
doReturn(new NoopCompressor()).when(compressorFactory).getCompressor(any(), any(), anyInt());

RocksDBServerConfig rocksDBServerConfig = mock(RocksDBServerConfig.class);
doReturn(rocksDBServerConfig).when(serverConfig).getRocksDBServerConfig();
Expand Down

0 comments on commit 92de3d3

Please sign in to comment.