From ed4776f8140071a5c7610e169b851bd0089cde2d Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Thu, 19 Dec 2024 09:56:53 -0800 Subject: [PATCH] [vpj][controller] Do not optimize backup version deletion for TTL repush (#1325) Today Venice deletes the repush source version as it is exactly same as repushed version immediately after the push. But for TTL repush since it differs from the source version this assumption is not correct. This PR prevents early deletion of TTL repush version and treats it as regular push. Also it creates a new pushID for ttl repush which would help tracking regular repush vs ttl repush. --- .../com/linkedin/venice/hadoop/VenicePushJob.java | 14 +++++++------- .../linkedin/venice/hadoop/VenicePushJobTest.java | 3 +++ .../java/com/linkedin/venice/meta/Version.java | 13 +++++++++++++ .../java/com/linkedin/venice/meta/TestVersion.java | 4 ++++ .../venice/controller/VeniceHelixAdmin.java | 2 +- 5 files changed, 28 insertions(+), 8 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index bd7da010eac..d670aa39728 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -295,6 +295,12 @@ public VenicePushJob(String jobId, Properties vanillaProps) { Lazy.of(() -> ByteBuffer.wrap(ZstdWithDictCompressor.buildDictionaryOnSyntheticAvroData())); sharedTmpDir = new Path(pushJobSetting.sharedTmpDir); jobTmpDir = new Path(pushJobSetting.jobTmpDir); + String pushId = + pushJobSetting.jobStartTimeMs + "_" + props.getString(JOB_EXEC_URL, "failed_to_obtain_execution_url"); + if (pushJobSetting.isSourceKafka) { + pushId = pushJobSetting.repushTTLEnabled ? Version.generateTTLRePushId(pushId) : Version.generateRePushId(pushId); + } + pushJobDetails.pushId = pushId; } // This is a part of the public API. There is value in exposing this to users of VenicePushJob for reporting purposes @@ -521,7 +527,6 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { Validate.isAssignableFrom(DataWriterComputeJob.class, objectClass); pushJobSettingToReturn.dataWriterComputeJobClass = objectClass; } - return pushJobSettingToReturn; } @@ -754,10 +759,7 @@ public void run() { } Optional optionalCompressionDictionary = getCompressionDictionary(); - String pushId = - pushJobSetting.jobStartTimeMs + "_" + props.getString(JOB_EXEC_URL, "failed_to_obtain_execution_url"); if (pushJobSetting.isSourceKafka) { - pushId = Version.generateRePushId(pushId); if (pushJobSetting.sourceKafkaInputVersionInfo.getHybridStoreConfig() != null && pushJobSetting.rewindTimeInSecondsOverride == NOT_SET) { pushJobSetting.rewindTimeInSecondsOverride = DEFAULT_RE_PUSH_REWIND_IN_SECONDS_OVERRIDE; @@ -784,12 +786,11 @@ public void run() { pushJobSetting, inputDataInfo.getInputFileDataSizeInBytes(), controllerClient, - pushId, + pushJobDetails.pushId.toString(), props, optionalCompressionDictionary); updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.NEW_VERSION_CREATED); // Update and send push job details with new info to the controller - pushJobDetails.pushId = pushId; pushJobDetails.partitionCount = pushJobSetting.partitionCount; pushJobDetails.valueCompressionStrategy = pushJobSetting.topicCompressionStrategy != null ? pushJobSetting.topicCompressionStrategy.getValue() @@ -1594,7 +1595,6 @@ private void initPushJobDetails() { pushJobDetails.clusterName = pushJobSetting.clusterName; pushJobDetails.overallStatus = new ArrayList<>(); pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.STARTED.getValue())); - pushJobDetails.pushId = ""; pushJobDetails.partitionCount = -1; pushJobDetails.valueCompressionStrategy = CompressionStrategy.NO_OP.getValue(); pushJobDetails.chunkingEnabled = false; diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index 9391b16972c..23149569596 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -174,6 +174,7 @@ public void testRepushTTLJobConfig() { PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); Assert.assertTrue(pushJobSetting.repushTTLEnabled); Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, -1); + Assert.assertTrue(Version.isPushIdTTLRePush(pushJob.getPushJobDetails().getPushId().toString())); } // Test with explicit TTL start timestamp @@ -184,6 +185,7 @@ public void testRepushTTLJobConfig() { PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); Assert.assertTrue(pushJobSetting.repushTTLEnabled); Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, 100); + Assert.assertTrue(Version.isPushIdTTLRePush(pushJob.getPushJobDetails().getPushId().toString())); } // Test with explicit TTL age @@ -217,6 +219,7 @@ public void testRepushTTLJobConfig() { PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); Assert.assertFalse(pushJobSetting.repushTTLEnabled); Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, -1); + Assert.assertTrue(Version.isPushIdRePush(pushJob.getPushJobDetails().getPushId().toString())); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java index 406422e8ed2..c5092013141 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java @@ -36,6 +36,8 @@ public interface Version extends Comparable, DataModelBackedStructure addVersion( version.setViewConfigs(store.getViewConfigs()); - if (repushSourceVersion > NON_EXISTING_VERSION) { + if (isRepush && repushSourceVersion > NON_EXISTING_VERSION) { version.setRepushSourceVersion(repushSourceVersion); }