diff --git a/clients/da-vinci-client/build.gradle b/clients/da-vinci-client/build.gradle index 0844960a43e..d7ef9f5547c 100644 --- a/clients/da-vinci-client/build.gradle +++ b/clients/da-vinci-client/build.gradle @@ -39,6 +39,7 @@ dependencies { implementation libraries.kafkaClients implementation libraries.rocksdbjni implementation libraries.zkclient // It's necessary to pull in the most recent version of zkclient explicitly, otherwise Helix won't have it... + implementation libraries.zstd testImplementation project(':internal:venice-test-common') testImplementation project(':internal:venice-client-common').sourceSets.test.output diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java index b609f32b4f8..0650cf2f9e1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java @@ -109,7 +109,11 @@ public class VersionBackend { backend.getConfigLoader().getCombinedProperties().getInt(SERVER_STOP_CONSUMPTION_TIMEOUT_IN_SECONDS, 60); this.storeDeserializerCache = backend.getStoreOrThrow(store.getName()).getStoreDeserializerCache(); this.compressor = Lazy.of( - () -> backend.getCompressorFactory().getCompressor(version.getCompressionStrategy(), version.kafkaTopicName())); + () -> backend.getCompressorFactory() + .getCompressor( + version.getCompressionStrategy(), + version.kafkaTopicName(), + config.getZstdDictCompressionLevel())); backend.getVersionByTopicMap().put(version.kafkaTopicName(), this); long daVinciPushStatusCheckIntervalInMs = this.config.getDaVinciPushStatusCheckIntervalInMs(); if (daVinciPushStatusCheckIntervalInMs >= 0) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/compression/StorageEngineBackedCompressorFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/compression/StorageEngineBackedCompressorFactory.java index 8cb206b1a2f..efb6866b170 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/compression/StorageEngineBackedCompressorFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/compression/StorageEngineBackedCompressorFactory.java @@ -8,16 +8,22 @@ import com.linkedin.venice.compression.VeniceCompressor; import com.linkedin.venice.utils.ByteUtils; import java.nio.ByteBuffer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class StorageEngineBackedCompressorFactory extends CompressorFactory { + private static final Logger LOGGER = LogManager.getLogger(StorageEngineBackedCompressorFactory.class); private final StorageMetadataService metadataService; public StorageEngineBackedCompressorFactory(StorageMetadataService metadataService) { this.metadataService = metadataService; } - public VeniceCompressor getCompressor(CompressionStrategy compressionStrategy, String kafkaTopic) { + public VeniceCompressor getCompressor( + CompressionStrategy compressionStrategy, + String kafkaTopic, + int dictCompressionLevel) { if (ZSTD_WITH_DICT.equals(compressionStrategy)) { VeniceCompressor compressor = getVersionSpecificCompressor(kafkaTopic); if (compressor != null) { @@ -28,10 +34,12 @@ public VeniceCompressor getCompressor(CompressionStrategy compressionStrategy, S if (dictionary == null) { throw new IllegalStateException("Got a null dictionary for: " + kafkaTopic); } + LOGGER.info("Creating a dict compressor with dict level: {} for topic: {}", dictCompressionLevel, kafkaTopic); return super.createVersionSpecificCompressorIfNotExist( compressionStrategy, kafkaTopic, - ByteUtils.extractByteArray(dictionary)); + ByteUtils.extractByteArray(dictionary), + dictCompressionLevel); } else { return getCompressor(compressionStrategy); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 2a104e93bf0..ef22554a628 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -153,6 +153,7 @@ import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND; import static com.linkedin.venice.ConfigKeys.SERVER_SYSTEM_STORE_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH; +import static com.linkedin.venice.ConfigKeys.SERVER_ZSTD_DICT_COMPRESSION_LEVEL; import static com.linkedin.venice.ConfigKeys.SEVER_CALCULATE_QUOTA_USAGE_BASED_ON_PARTITIONS_ASSIGNMENT_ENABLED; import static com.linkedin.venice.ConfigKeys.SORTED_INPUT_DRAINER_SIZE; import static com.linkedin.venice.ConfigKeys.STORE_WRITER_BUFFER_AFTER_LEADER_LOGIC_ENABLED; @@ -168,6 +169,7 @@ import static com.linkedin.venice.utils.ByteUtils.BYTES_PER_MB; import static com.linkedin.venice.utils.ByteUtils.generateHumanReadableByteCountString; +import com.github.luben.zstd.Zstd; import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModelFactory; import com.linkedin.davinci.kafka.consumer.KafkaConsumerService; import com.linkedin.davinci.kafka.consumer.KafkaConsumerServiceDelegator; @@ -555,6 +557,7 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final int aaWCWorkloadParallelProcessingThreadPoolSize; private final boolean isGlobalRtDivEnabled; private final boolean nearlineWorkloadProducerThroughputOptimizationEnabled; + private final int zstdDictCompressionLevel; public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException { this(serverProperties, Collections.emptyMap()); @@ -929,6 +932,14 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map Zstd.maxCompressionLevel()) { + throw new VeniceException( + "Invalid zstd dict compression level: " + zstdDictCompressionLevel + " should be between " + + Zstd.minCompressionLevel() + " and " + Zstd.maxCompressionLevel()); + } } long extractIngestionMemoryLimit( @@ -1690,4 +1701,8 @@ public boolean isGlobalRtDivEnabled() { public boolean isNearlineWorkloadProducerThroughputOptimizationEnabled() { return nearlineWorkloadProducerThroughputOptimizationEnabled; } + + public int getZstdDictCompressionLevel() { + return zstdDictCompressionLevel; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 3b29023681d..cc8501f4408 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -3111,8 +3111,11 @@ protected ByteBuffer maybeCompressData( } if (shouldCompressData(partitionConsumptionState)) { try { + long startTimeInNS = System.nanoTime(); // We need to expand the front of the returned bytebuffer to make room for schema header insertion - return compressor.get().compress(data, ByteUtils.SIZE_OF_INT); + ByteBuffer result = compressor.get().compress(data, ByteUtils.SIZE_OF_INT); + hostLevelIngestionStats.recordLeaderCompressLatency(LatencyUtils.getElapsedTimeFromNSToMS(startTimeInNS)); + return result; } catch (IOException e) { // throw a loud exception if something goes wrong here throw new RuntimeException( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index aa035fec0e3..cf43fbebb94 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -498,7 +498,9 @@ public StoreIngestionTask( this.localKafkaClusterId = kafkaClusterUrlToIdMap.getOrDefault(localKafkaServer, Integer.MIN_VALUE); this.compressionStrategy = version.getCompressionStrategy(); this.compressorFactory = builder.getCompressorFactory(); - this.compressor = Lazy.of(() -> compressorFactory.getCompressor(compressionStrategy, kafkaVersionTopic)); + this.compressor = Lazy.of( + () -> compressorFactory + .getCompressor(compressionStrategy, kafkaVersionTopic, serverConfig.getZstdDictCompressionLevel())); this.isChunked = version.isChunkingEnabled(); this.isRmdChunked = version.isRmdChunkingEnabled(); this.manifestSerializer = new ChunkedValueManifestSerializer(true); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java index 661ed0a967f..e59ba6ee792 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java @@ -145,6 +145,7 @@ public class HostLevelIngestionStats extends AbstractVeniceStats { private final LongAdderRateGauge totalTombstoneCreationDCRRate; private final Sensor leaderProduceLatencySensor; + private final Sensor leaderCompressLatencySensor; private final LongAdderRateGauge batchProcessingRequestSensor; private final Sensor batchProcessingRequestSizeSensor; private final LongAdderRateGauge batchProcessingRequestRecordsSensor; @@ -454,6 +455,11 @@ public HostLevelIngestionStats( totalStats, () -> totalStats.leaderProduceLatencySensor, avgAndMax()); + this.leaderCompressLatencySensor = registerPerStoreAndTotalSensor( + "leader_compress_latency", + totalStats, + () -> totalStats.leaderCompressLatencySensor, + avgAndMax()); this.batchProcessingRequestSensor = registerOnlyTotalRate( BATCH_PROCESSING_REQUEST, totalStats, @@ -663,6 +669,10 @@ public void recordLeaderProduceLatency(double latency) { leaderProduceLatencySensor.record(latency); } + public void recordLeaderCompressLatency(double latency) { + leaderCompressLatencySensor.record(latency); + } + public void recordBatchProcessingRequest(int size) { batchProcessingRequestSensor.record(); batchProcessingRequestRecordsSensor.record(size); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java index d72a56c7b04..dcc211b0f01 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java @@ -4,6 +4,7 @@ import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import com.github.luben.zstd.Zstd; import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory; import com.linkedin.davinci.listener.response.NoOpReadResponseStats; import com.linkedin.davinci.storage.StorageMetadataService; @@ -225,8 +226,10 @@ private void runTest( int readerSchemaId = schemaEntry.getId(); try (StorageEngineBackedCompressorFactory compressorFactory = new StorageEngineBackedCompressorFactory(mock(StorageMetadataService.class))) { - VeniceCompressor compressor = - compressorFactory.getCompressor(CompressionStrategy.NO_OP, storageEngine.getStoreVersionName()); + VeniceCompressor compressor = compressorFactory.getCompressor( + CompressionStrategy.NO_OP, + storageEngine.getStoreVersionName(), + Zstd.defaultCompressionLevel()); Object retrievedObject; if (getWithSchemaId) { retrievedObject = chunkingAdapter.getWithSchemaId( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index dcadb283f27..451974a8d55 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -2326,4 +2326,6 @@ private ConfigKeys() { */ public static final String SERVER_NEARLINE_WORKLOAD_PRODUCER_THROUGHPUT_OPTIMIZATION_ENABLED = "server.nearline.workload.producer.throughput.optimization.enabled"; + + public static final String SERVER_ZSTD_DICT_COMPRESSION_LEVEL = "server.zstd.dict.compression.level"; } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java index cc9c0bc6e5d..3677ff6d646 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java @@ -690,8 +690,10 @@ private static class ComputeRequestContext extends RequestContext { this.valueSchemaEntry = handler.getComputeValueSchema(request); this.resultSchema = handler.getComputeResultSchema(request.getComputeRequest(), valueSchemaEntry.getSchema()); this.resultSerializer = handler.genericSerializerGetter.apply(resultSchema); - this.compressor = handler.compressorFactory - .getCompressor(storeVersion.storageEngine.getCompressionStrategy(), request.getResourceName()); + this.compressor = handler.compressorFactory.getCompressor( + storeVersion.storageEngine.getCompressionStrategy(), + request.getResourceName(), + handler.serverConfig.getZstdDictCompressionLevel()); this.operations = request.getComputeRequest().getOperations(); this.operationResultFields = ComputeUtils.getOperationResultFields(operations, resultSchema); } diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java index 58d5aa61cf5..00913b1d9c0 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java @@ -202,7 +202,7 @@ public void setUp() { doReturn(partitionerConfig).when(version).getPartitionerConfig(); doReturn(storageEngine).when(storageEngineRepository).getLocalStorageEngine(any()); - doReturn(new NoopCompressor()).when(compressorFactory).getCompressor(any(), any()); + doReturn(new NoopCompressor()).when(compressorFactory).getCompressor(any(), any(), anyInt()); RocksDBServerConfig rocksDBServerConfig = mock(RocksDBServerConfig.class); doReturn(rocksDBServerConfig).when(serverConfig).getRocksDBServerConfig();