diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index 41953975be..7e14379db6 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -295,6 +295,12 @@ public VenicePushJob(String jobId, Properties vanillaProps) { Lazy.of(() -> ByteBuffer.wrap(ZstdWithDictCompressor.buildDictionaryOnSyntheticAvroData())); sharedTmpDir = new Path(pushJobSetting.sharedTmpDir); jobTmpDir = new Path(pushJobSetting.jobTmpDir); + String pushId = + pushJobSetting.jobStartTimeMs + "_" + props.getString(JOB_EXEC_URL, "failed_to_obtain_execution_url"); + if (pushJobSetting.isSourceKafka) { + pushId = pushJobSetting.repushTTLEnabled ? Version.generateTTLRePushId(pushId) : Version.generateRePushId(pushId); + } + pushJobDetails.pushId = pushId; } // This is a part of the public API. There is value in exposing this to users of VenicePushJob for reporting purposes @@ -521,7 +527,6 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { Validate.isAssignableFrom(DataWriterComputeJob.class, objectClass); pushJobSettingToReturn.dataWriterComputeJobClass = objectClass; } - return pushJobSettingToReturn; } @@ -754,11 +759,7 @@ public void run() { } Optional optionalCompressionDictionary = getCompressionDictionary(); - String pushId = - pushJobSetting.jobStartTimeMs + "_" + props.getString(JOB_EXEC_URL, "failed_to_obtain_execution_url"); if (pushJobSetting.isSourceKafka) { - pushId = - pushJobSetting.repushTTLEnabled ? Version.generateTTLRePushId(pushId) : Version.generateRePushId(pushId); if (pushJobSetting.sourceKafkaInputVersionInfo.getHybridStoreConfig() != null && pushJobSetting.rewindTimeInSecondsOverride == NOT_SET) { pushJobSetting.rewindTimeInSecondsOverride = DEFAULT_RE_PUSH_REWIND_IN_SECONDS_OVERRIDE; @@ -785,12 +786,11 @@ public void run() { pushJobSetting, inputDataInfo.getInputFileDataSizeInBytes(), controllerClient, - pushId, + pushJobDetails.pushId.toString(), props, optionalCompressionDictionary); updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.NEW_VERSION_CREATED); // Update and send push job details with new info to the controller - pushJobDetails.pushId = pushId; pushJobDetails.partitionCount = pushJobSetting.partitionCount; pushJobDetails.valueCompressionStrategy = pushJobSetting.topicCompressionStrategy != null ? pushJobSetting.topicCompressionStrategy.getValue() diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index 9391b16972..2314956959 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -174,6 +174,7 @@ public void testRepushTTLJobConfig() { PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); Assert.assertTrue(pushJobSetting.repushTTLEnabled); Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, -1); + Assert.assertTrue(Version.isPushIdTTLRePush(pushJob.getPushJobDetails().getPushId().toString())); } // Test with explicit TTL start timestamp @@ -184,6 +185,7 @@ public void testRepushTTLJobConfig() { PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); Assert.assertTrue(pushJobSetting.repushTTLEnabled); Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, 100); + Assert.assertTrue(Version.isPushIdTTLRePush(pushJob.getPushJobDetails().getPushId().toString())); } // Test with explicit TTL age @@ -217,6 +219,7 @@ public void testRepushTTLJobConfig() { PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); Assert.assertFalse(pushJobSetting.repushTTLEnabled); Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, -1); + Assert.assertTrue(Version.isPushIdRePush(pushJob.getPushJobDetails().getPushId().toString())); } }