Skip to content

Commit

Permalink
[vpj][controller] Do not optimize backup version deletion for TTL rep…
Browse files Browse the repository at this point in the history
…ush (#1325)

Today Venice deletes the repush source version as it is exactly same as repushed version immediately after the push. But for TTL repush since it differs from the source version this assumption is not correct. This PR prevents early deletion of TTL repush version and treats it as regular push.
Also it creates a new pushID for ttl repush which would help tracking regular repush vs ttl repush.
  • Loading branch information
majisourav99 authored Dec 19, 2024
1 parent cc10770 commit ed4776f
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ public VenicePushJob(String jobId, Properties vanillaProps) {
Lazy.of(() -> ByteBuffer.wrap(ZstdWithDictCompressor.buildDictionaryOnSyntheticAvroData()));
sharedTmpDir = new Path(pushJobSetting.sharedTmpDir);
jobTmpDir = new Path(pushJobSetting.jobTmpDir);
String pushId =
pushJobSetting.jobStartTimeMs + "_" + props.getString(JOB_EXEC_URL, "failed_to_obtain_execution_url");
if (pushJobSetting.isSourceKafka) {
pushId = pushJobSetting.repushTTLEnabled ? Version.generateTTLRePushId(pushId) : Version.generateRePushId(pushId);
}
pushJobDetails.pushId = pushId;
}

// This is a part of the public API. There is value in exposing this to users of VenicePushJob for reporting purposes
Expand Down Expand Up @@ -521,7 +527,6 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) {
Validate.isAssignableFrom(DataWriterComputeJob.class, objectClass);
pushJobSettingToReturn.dataWriterComputeJobClass = objectClass;
}

return pushJobSettingToReturn;
}

Expand Down Expand Up @@ -754,10 +759,7 @@ public void run() {
}

Optional<ByteBuffer> optionalCompressionDictionary = getCompressionDictionary();
String pushId =
pushJobSetting.jobStartTimeMs + "_" + props.getString(JOB_EXEC_URL, "failed_to_obtain_execution_url");
if (pushJobSetting.isSourceKafka) {
pushId = Version.generateRePushId(pushId);
if (pushJobSetting.sourceKafkaInputVersionInfo.getHybridStoreConfig() != null
&& pushJobSetting.rewindTimeInSecondsOverride == NOT_SET) {
pushJobSetting.rewindTimeInSecondsOverride = DEFAULT_RE_PUSH_REWIND_IN_SECONDS_OVERRIDE;
Expand All @@ -784,12 +786,11 @@ public void run() {
pushJobSetting,
inputDataInfo.getInputFileDataSizeInBytes(),
controllerClient,
pushId,
pushJobDetails.pushId.toString(),
props,
optionalCompressionDictionary);
updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.NEW_VERSION_CREATED);
// Update and send push job details with new info to the controller
pushJobDetails.pushId = pushId;
pushJobDetails.partitionCount = pushJobSetting.partitionCount;
pushJobDetails.valueCompressionStrategy = pushJobSetting.topicCompressionStrategy != null
? pushJobSetting.topicCompressionStrategy.getValue()
Expand Down Expand Up @@ -1594,7 +1595,6 @@ private void initPushJobDetails() {
pushJobDetails.clusterName = pushJobSetting.clusterName;
pushJobDetails.overallStatus = new ArrayList<>();
pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.STARTED.getValue()));
pushJobDetails.pushId = "";
pushJobDetails.partitionCount = -1;
pushJobDetails.valueCompressionStrategy = CompressionStrategy.NO_OP.getValue();
pushJobDetails.chunkingEnabled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public void testRepushTTLJobConfig() {
PushJobSetting pushJobSetting = pushJob.getPushJobSetting();
Assert.assertTrue(pushJobSetting.repushTTLEnabled);
Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, -1);
Assert.assertTrue(Version.isPushIdTTLRePush(pushJob.getPushJobDetails().getPushId().toString()));
}

// Test with explicit TTL start timestamp
Expand All @@ -184,6 +185,7 @@ public void testRepushTTLJobConfig() {
PushJobSetting pushJobSetting = pushJob.getPushJobSetting();
Assert.assertTrue(pushJobSetting.repushTTLEnabled);
Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, 100);
Assert.assertTrue(Version.isPushIdTTLRePush(pushJob.getPushJobDetails().getPushId().toString()));
}

// Test with explicit TTL age
Expand Down Expand Up @@ -217,6 +219,7 @@ public void testRepushTTLJobConfig() {
PushJobSetting pushJobSetting = pushJob.getPushJobSetting();
Assert.assertFalse(pushJobSetting.repushTTLEnabled);
Assert.assertEquals(pushJobSetting.repushTTLStartTimeMs, -1);
Assert.assertTrue(Version.isPushIdRePush(pushJob.getPushJobDetails().getPushId().toString()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public interface Version extends Comparable<Version>, DataModelBackedStructure<S
*/
String VENICE_RE_PUSH_PUSH_ID_PREFIX = "venice_re_push_";

String VENICE_TTL_RE_PUSH_PUSH_ID_PREFIX = "venice_ttl_re_push_";

/**
* Producer type for writing data to Venice
*/
Expand Down Expand Up @@ -427,6 +429,17 @@ static String generateRePushId(String pushId) {
return VENICE_RE_PUSH_PUSH_ID_PREFIX + pushId;
}

static String generateTTLRePushId(String pushId) {
return VENICE_TTL_RE_PUSH_PUSH_ID_PREFIX + pushId;
}

static boolean isPushIdTTLRePush(String pushId) {
if (pushId == null || pushId.isEmpty()) {
return false;
}
return pushId.startsWith(VENICE_TTL_RE_PUSH_PUSH_ID_PREFIX);
}

static boolean isPushIdRePush(String pushId) {
if (pushId == null || pushId.isEmpty()) {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.venice.meta;

import static com.linkedin.venice.meta.Version.VENICE_TTL_RE_PUSH_PUSH_ID_PREFIX;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.utils.ObjectMapperFactory;
Expand Down Expand Up @@ -113,6 +115,8 @@ public void testIsATopicThatIsVersioned() {
Assert.assertTrue(Version.isATopicThatIsVersioned(topic));
topic = "abc_v1_cc";
Assert.assertTrue(Version.isATopicThatIsVersioned(topic));
String pushId = VENICE_TTL_RE_PUSH_PUSH_ID_PREFIX + System.currentTimeMillis();
Assert.assertTrue(Version.isPushIdTTLRePush(pushId));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2828,7 +2828,7 @@ private Pair<Boolean, Version> addVersion(

version.setViewConfigs(store.getViewConfigs());

if (repushSourceVersion > NON_EXISTING_VERSION) {
if (isRepush && repushSourceVersion > NON_EXISTING_VERSION) {
version.setRepushSourceVersion(repushSourceVersion);
}

Expand Down

0 comments on commit ed4776f

Please sign in to comment.