From 92de3d3bbbed0ca55c0c6fef2520fa842e4e6091 Mon Sep 17 00:00:00 2001 From: Gaojie Liu Date: Tue, 19 Nov 2024 11:58:23 -0800 Subject: [PATCH] [server] Tuned zstd compression level in ingestion (#1322) Previously, Venice Server is always using the max compression level(22) to compress the payload, which is very slow and this PR will change the compression level to the default level (3) by default, and the compression speedup is roughly 10-20x and it might introduce some more storage overhead, 5-10%, but it should be fine. This PR doesn't change the compression level in batch push as it is offline, and if we see a need to speed up the map-reduce phase, we can do the same later. New Server Config: server.zstd.dict.compression.level: default 3 New metric: leader_compress_latency --- clients/da-vinci-client/build.gradle | 1 + .../java/com/linkedin/davinci/VersionBackend.java | 6 +++++- .../StorageEngineBackedCompressorFactory.java | 12 ++++++++++-- .../davinci/config/VeniceServerConfig.java | 15 +++++++++++++++ .../LeaderFollowerStoreIngestionTask.java | 5 ++++- .../kafka/consumer/StoreIngestionTask.java | 4 +++- .../davinci/stats/HostLevelIngestionStats.java | 10 ++++++++++ .../davinci/storage/chunking/ChunkingTest.java | 7 +++++-- .../main/java/com/linkedin/venice/ConfigKeys.java | 2 ++ .../listener/StorageReadRequestHandler.java | 6 ++++-- .../listener/StorageReadRequestHandlerTest.java | 2 +- 11 files changed, 60 insertions(+), 10 deletions(-) diff --git a/clients/da-vinci-client/build.gradle b/clients/da-vinci-client/build.gradle index 0844960a43e..d7ef9f5547c 100644 --- a/clients/da-vinci-client/build.gradle +++ b/clients/da-vinci-client/build.gradle @@ -39,6 +39,7 @@ dependencies { implementation libraries.kafkaClients implementation libraries.rocksdbjni implementation libraries.zkclient // It's necessary to pull in the most recent version of zkclient explicitly, otherwise Helix won't have it... + implementation libraries.zstd testImplementation project(':internal:venice-test-common') testImplementation project(':internal:venice-client-common').sourceSets.test.output diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java index b609f32b4f8..0650cf2f9e1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java @@ -109,7 +109,11 @@ public class VersionBackend { backend.getConfigLoader().getCombinedProperties().getInt(SERVER_STOP_CONSUMPTION_TIMEOUT_IN_SECONDS, 60); this.storeDeserializerCache = backend.getStoreOrThrow(store.getName()).getStoreDeserializerCache(); this.compressor = Lazy.of( - () -> backend.getCompressorFactory().getCompressor(version.getCompressionStrategy(), version.kafkaTopicName())); + () -> backend.getCompressorFactory() + .getCompressor( + version.getCompressionStrategy(), + version.kafkaTopicName(), + config.getZstdDictCompressionLevel())); backend.getVersionByTopicMap().put(version.kafkaTopicName(), this); long daVinciPushStatusCheckIntervalInMs = this.config.getDaVinciPushStatusCheckIntervalInMs(); if (daVinciPushStatusCheckIntervalInMs >= 0) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/compression/StorageEngineBackedCompressorFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/compression/StorageEngineBackedCompressorFactory.java index 8cb206b1a2f..efb6866b170 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/compression/StorageEngineBackedCompressorFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/compression/StorageEngineBackedCompressorFactory.java @@ -8,16 +8,22 @@ import com.linkedin.venice.compression.VeniceCompressor; import com.linkedin.venice.utils.ByteUtils; import java.nio.ByteBuffer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class StorageEngineBackedCompressorFactory extends CompressorFactory { + private static final Logger LOGGER = LogManager.getLogger(StorageEngineBackedCompressorFactory.class); private final StorageMetadataService metadataService; public StorageEngineBackedCompressorFactory(StorageMetadataService metadataService) { this.metadataService = metadataService; } - public VeniceCompressor getCompressor(CompressionStrategy compressionStrategy, String kafkaTopic) { + public VeniceCompressor getCompressor( + CompressionStrategy compressionStrategy, + String kafkaTopic, + int dictCompressionLevel) { if (ZSTD_WITH_DICT.equals(compressionStrategy)) { VeniceCompressor compressor = getVersionSpecificCompressor(kafkaTopic); if (compressor != null) { @@ -28,10 +34,12 @@ public VeniceCompressor getCompressor(CompressionStrategy compressionStrategy, S if (dictionary == null) { throw new IllegalStateException("Got a null dictionary for: " + kafkaTopic); } + LOGGER.info("Creating a dict compressor with dict level: {} for topic: {}", dictCompressionLevel, kafkaTopic); return super.createVersionSpecificCompressorIfNotExist( compressionStrategy, kafkaTopic, - ByteUtils.extractByteArray(dictionary)); + ByteUtils.extractByteArray(dictionary), + dictCompressionLevel); } else { return getCompressor(compressionStrategy); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 2a104e93bf0..ef22554a628 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -153,6 +153,7 @@ import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND; import static com.linkedin.venice.ConfigKeys.SERVER_SYSTEM_STORE_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH; +import static com.linkedin.venice.ConfigKeys.SERVER_ZSTD_DICT_COMPRESSION_LEVEL; import static com.linkedin.venice.ConfigKeys.SEVER_CALCULATE_QUOTA_USAGE_BASED_ON_PARTITIONS_ASSIGNMENT_ENABLED; import static com.linkedin.venice.ConfigKeys.SORTED_INPUT_DRAINER_SIZE; import static com.linkedin.venice.ConfigKeys.STORE_WRITER_BUFFER_AFTER_LEADER_LOGIC_ENABLED; @@ -168,6 +169,7 @@ import static com.linkedin.venice.utils.ByteUtils.BYTES_PER_MB; import static com.linkedin.venice.utils.ByteUtils.generateHumanReadableByteCountString; +import com.github.luben.zstd.Zstd; import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModelFactory; import com.linkedin.davinci.kafka.consumer.KafkaConsumerService; import com.linkedin.davinci.kafka.consumer.KafkaConsumerServiceDelegator; @@ -555,6 +557,7 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final int aaWCWorkloadParallelProcessingThreadPoolSize; private final boolean isGlobalRtDivEnabled; private final boolean nearlineWorkloadProducerThroughputOptimizationEnabled; + private final int zstdDictCompressionLevel; public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException { this(serverProperties, Collections.emptyMap()); @@ -929,6 +932,14 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map Zstd.maxCompressionLevel()) { + throw new VeniceException( + "Invalid zstd dict compression level: " + zstdDictCompressionLevel + " should be between " + + Zstd.minCompressionLevel() + " and " + Zstd.maxCompressionLevel()); + } } long extractIngestionMemoryLimit( @@ -1690,4 +1701,8 @@ public boolean isGlobalRtDivEnabled() { public boolean isNearlineWorkloadProducerThroughputOptimizationEnabled() { return nearlineWorkloadProducerThroughputOptimizationEnabled; } + + public int getZstdDictCompressionLevel() { + return zstdDictCompressionLevel; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 3b29023681d..cc8501f4408 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -3111,8 +3111,11 @@ protected ByteBuffer maybeCompressData( } if (shouldCompressData(partitionConsumptionState)) { try { + long startTimeInNS = System.nanoTime(); // We need to expand the front of the returned bytebuffer to make room for schema header insertion - return compressor.get().compress(data, ByteUtils.SIZE_OF_INT); + ByteBuffer result = compressor.get().compress(data, ByteUtils.SIZE_OF_INT); + hostLevelIngestionStats.recordLeaderCompressLatency(LatencyUtils.getElapsedTimeFromNSToMS(startTimeInNS)); + return result; } catch (IOException e) { // throw a loud exception if something goes wrong here throw new RuntimeException( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index aa035fec0e3..cf43fbebb94 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -498,7 +498,9 @@ public StoreIngestionTask( this.localKafkaClusterId = kafkaClusterUrlToIdMap.getOrDefault(localKafkaServer, Integer.MIN_VALUE); this.compressionStrategy = version.getCompressionStrategy(); this.compressorFactory = builder.getCompressorFactory(); - this.compressor = Lazy.of(() -> compressorFactory.getCompressor(compressionStrategy, kafkaVersionTopic)); + this.compressor = Lazy.of( + () -> compressorFactory + .getCompressor(compressionStrategy, kafkaVersionTopic, serverConfig.getZstdDictCompressionLevel())); this.isChunked = version.isChunkingEnabled(); this.isRmdChunked = version.isRmdChunkingEnabled(); this.manifestSerializer = new ChunkedValueManifestSerializer(true); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java index 661ed0a967f..e59ba6ee792 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java @@ -145,6 +145,7 @@ public class HostLevelIngestionStats extends AbstractVeniceStats { private final LongAdderRateGauge totalTombstoneCreationDCRRate; private final Sensor leaderProduceLatencySensor; + private final Sensor leaderCompressLatencySensor; private final LongAdderRateGauge batchProcessingRequestSensor; private final Sensor batchProcessingRequestSizeSensor; private final LongAdderRateGauge batchProcessingRequestRecordsSensor; @@ -454,6 +455,11 @@ public HostLevelIngestionStats( totalStats, () -> totalStats.leaderProduceLatencySensor, avgAndMax()); + this.leaderCompressLatencySensor = registerPerStoreAndTotalSensor( + "leader_compress_latency", + totalStats, + () -> totalStats.leaderCompressLatencySensor, + avgAndMax()); this.batchProcessingRequestSensor = registerOnlyTotalRate( BATCH_PROCESSING_REQUEST, totalStats, @@ -663,6 +669,10 @@ public void recordLeaderProduceLatency(double latency) { leaderProduceLatencySensor.record(latency); } + public void recordLeaderCompressLatency(double latency) { + leaderCompressLatencySensor.record(latency); + } + public void recordBatchProcessingRequest(int size) { batchProcessingRequestSensor.record(); batchProcessingRequestRecordsSensor.record(size); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java index d72a56c7b04..dcc211b0f01 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java @@ -4,6 +4,7 @@ import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import com.github.luben.zstd.Zstd; import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory; import com.linkedin.davinci.listener.response.NoOpReadResponseStats; import com.linkedin.davinci.storage.StorageMetadataService; @@ -225,8 +226,10 @@ private void runTest( int readerSchemaId = schemaEntry.getId(); try (StorageEngineBackedCompressorFactory compressorFactory = new StorageEngineBackedCompressorFactory(mock(StorageMetadataService.class))) { - VeniceCompressor compressor = - compressorFactory.getCompressor(CompressionStrategy.NO_OP, storageEngine.getStoreVersionName()); + VeniceCompressor compressor = compressorFactory.getCompressor( + CompressionStrategy.NO_OP, + storageEngine.getStoreVersionName(), + Zstd.defaultCompressionLevel()); Object retrievedObject; if (getWithSchemaId) { retrievedObject = chunkingAdapter.getWithSchemaId( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index dcadb283f27..451974a8d55 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -2326,4 +2326,6 @@ private ConfigKeys() { */ public static final String SERVER_NEARLINE_WORKLOAD_PRODUCER_THROUGHPUT_OPTIMIZATION_ENABLED = "server.nearline.workload.producer.throughput.optimization.enabled"; + + public static final String SERVER_ZSTD_DICT_COMPRESSION_LEVEL = "server.zstd.dict.compression.level"; } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java index cc9c0bc6e5d..3677ff6d646 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java @@ -690,8 +690,10 @@ private static class ComputeRequestContext extends RequestContext { this.valueSchemaEntry = handler.getComputeValueSchema(request); this.resultSchema = handler.getComputeResultSchema(request.getComputeRequest(), valueSchemaEntry.getSchema()); this.resultSerializer = handler.genericSerializerGetter.apply(resultSchema); - this.compressor = handler.compressorFactory - .getCompressor(storeVersion.storageEngine.getCompressionStrategy(), request.getResourceName()); + this.compressor = handler.compressorFactory.getCompressor( + storeVersion.storageEngine.getCompressionStrategy(), + request.getResourceName(), + handler.serverConfig.getZstdDictCompressionLevel()); this.operations = request.getComputeRequest().getOperations(); this.operationResultFields = ComputeUtils.getOperationResultFields(operations, resultSchema); } diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java index 58d5aa61cf5..00913b1d9c0 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java @@ -202,7 +202,7 @@ public void setUp() { doReturn(partitionerConfig).when(version).getPartitionerConfig(); doReturn(storageEngine).when(storageEngineRepository).getLocalStorageEngine(any()); - doReturn(new NoopCompressor()).when(compressorFactory).getCompressor(any(), any()); + doReturn(new NoopCompressor()).when(compressorFactory).getCompressor(any(), any(), anyInt()); RocksDBServerConfig rocksDBServerConfig = mock(RocksDBServerConfig.class); doReturn(rocksDBServerConfig).when(serverConfig).getRocksDBServerConfig();