Skip to content

Commit

Permalink
added unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Sourav Maji committed Dec 18, 2024
1 parent 6611b82 commit 965d683
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ public VenicePushJob(String jobId, Properties vanillaProps) {
Lazy.of(() -> ByteBuffer.wrap(ZstdWithDictCompressor.buildDictionaryOnSyntheticAvroData()));
sharedTmpDir = new Path(pushJobSetting.sharedTmpDir);
jobTmpDir = new Path(pushJobSetting.jobTmpDir);
String pushId =
pushJobSetting.jobStartTimeMs + "_" + props.getString(JOB_EXEC_URL, "failed_to_obtain_execution_url");
if (pushJobSetting.isSourceKafka) {
pushId = pushJobSetting.repushTTLEnabled ? Version.generateTTLRePushId(pushId) : Version.generateRePushId(pushId);
}
pushJobDetails.pushId = pushId;
}

// This is a part of the public API. There is value in exposing this to users of VenicePushJob for reporting purposes
Expand Down Expand Up @@ -521,7 +527,6 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) {
Validate.isAssignableFrom(DataWriterComputeJob.class, objectClass);
pushJobSettingToReturn.dataWriterComputeJobClass = objectClass;
}

return pushJobSettingToReturn;
}

Expand Down Expand Up @@ -754,11 +759,7 @@ public void run() {
}

Optional<ByteBuffer> optionalCompressionDictionary = getCompressionDictionary();
String pushId =
pushJobSetting.jobStartTimeMs + "_" + props.getString(JOB_EXEC_URL, "failed_to_obtain_execution_url");
if (pushJobSetting.isSourceKafka) {
pushId =
pushJobSetting.repushTTLEnabled ? Version.generateTTLRePushId(pushId) : Version.generateRePushId(pushId);
if (pushJobSetting.sourceKafkaInputVersionInfo.getHybridStoreConfig() != null
&& pushJobSetting.rewindTimeInSecondsOverride == NOT_SET) {
pushJobSetting.rewindTimeInSecondsOverride = DEFAULT_RE_PUSH_REWIND_IN_SECONDS_OVERRIDE;
Expand All @@ -785,12 +786,11 @@ public void run() {
pushJobSetting,
inputDataInfo.getInputFileDataSizeInBytes(),
controllerClient,
pushId,
pushJobDetails.pushId.toString(),
props,
optionalCompressionDictionary);
updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.NEW_VERSION_CREATED);
// Update and send push job details with new info to the controller
pushJobDetails.pushId = pushId;
pushJobDetails.partitionCount = pushJobSetting.partitionCount;
pushJobDetails.valueCompressionStrategy = pushJobSetting.topicCompressionStrategy != null
? pushJobSetting.topicCompressionStrategy.getValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public void testRepushTTLJobConfig() {
PushJobSetting pushJobSetting = pushJob.getPushJobSetting();
Assert.assertTrue(pushJobSetting.repushTTLEnabled);
Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, -1);
Assert.assertTrue(Version.isPushIdTTLRePush(pushJob.getPushJobDetails().getPushId().toString()));
}

// Test with explicit TTL start timestamp
Expand All @@ -184,6 +185,7 @@ public void testRepushTTLJobConfig() {
PushJobSetting pushJobSetting = pushJob.getPushJobSetting();
Assert.assertTrue(pushJobSetting.repushTTLEnabled);
Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, 100);
Assert.assertTrue(Version.isPushIdTTLRePush(pushJob.getPushJobDetails().getPushId().toString()));
}

// Test with explicit TTL age
Expand Down Expand Up @@ -217,6 +219,7 @@ public void testRepushTTLJobConfig() {
PushJobSetting pushJobSetting = pushJob.getPushJobSetting();
Assert.assertFalse(pushJobSetting.repushTTLEnabled);
Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, -1);
Assert.assertTrue(Version.isPushIdRePush(pushJob.getPushJobDetails().getPushId().toString()));
}
}

Expand Down

0 comments on commit 965d683

Please sign in to comment.