Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vpj][controller] Add deferred swap for target region push #1375

Merged
merged 20 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class PushJobSetting implements Serializable {
public boolean d2Routing;
public String targetedRegions;
public boolean isTargetedRegionPushEnabled;
public boolean isTargetRegionPushWithDeferredSwapEnabled;
public boolean isSystemSchemaReaderEnabled;
public String systemSchemaClusterD2ServiceName;
public String systemSchemaClusterD2ZKHost;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static com.linkedin.venice.vpj.VenicePushJobConstants.SYSTEM_SCHEMA_READER_ENABLED;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_LIST;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TEMP_DIR_PREFIX;
import static com.linkedin.venice.vpj.VenicePushJobConstants.UNCREATED_VERSION_NUMBER;
import static com.linkedin.venice.vpj.VenicePushJobConstants.USE_MAPPER_TO_BUILD_DICTIONARY;
Expand Down Expand Up @@ -502,6 +503,14 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) {
pushJobSettingToReturn.dataWriterComputeJobClass = objectClass;
}

// If target region push with deferred version swap is enabled, enable deferVersionSwap and
// isTargetedRegionPushEnabled
if (props.getBoolean(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, false)) {
pushJobSettingToReturn.deferVersionSwap = true;
pushJobSettingToReturn.isTargetedRegionPushEnabled = true;
pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled = true;
}

return pushJobSettingToReturn;
}

Expand Down Expand Up @@ -851,7 +860,7 @@ public void run() {
sendPushJobDetailsToController();

// only kick off the validation and post-validation flow when everything has to be done in a single VPJ
if (!pushJobSetting.isTargetedRegionPushEnabled) {
if (!pushJobSetting.isTargetedRegionPushEnabled || pushJobSetting.isTargetRegionPushWithDeferredSwapEnabled) {
return;
}

Expand Down Expand Up @@ -2075,7 +2084,8 @@ private void validateStoreSettingAndPopulate(ControllerClient controllerClient,
jobSetting.storeStorageQuota = storeResponse.getStore().getStorageQuotaInByte();

// Do not enable for deferred swap or hybrid store
if (pushJobSetting.deferVersionSwap || storeResponse.getStore().getHybridStoreConfig() != null) {
boolean isDeferredSwap = pushJobSetting.deferVersionSwap && !pushJobSetting.isTargetedRegionPushEnabled;
if (isDeferredSwap || storeResponse.getStore().getHybridStoreConfig() != null) {
LOGGER.warn(
"target region is not available for {} as it hybrid or deferred version swap enabled.",
jobSetting.storeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,14 @@ private VenicePushJobConstants() {
*/
public static final String TARGETED_REGION_PUSH_LIST = "targeted.region.push.list";

/**
* Config to enable target region push with deferred version swap
* In this mode, the VPJ will push data to all regions and only switch to the new version in a single region.
* The single region is decided by the store config in {@link StoreInfo#getNativeReplicationSourceFabric()}}.
misyel marked this conversation as resolved.
Show resolved Hide resolved
* After a specified wait time (default 1h), the remaining regions will switch to the new version.
*/
public static final String TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP = "targeted.region.push.with.deferred.swap";

public static final boolean DEFAULT_IS_DUPLICATED_KEY_ALLOWED = false;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.linkedin.venice.vpj.VenicePushJobConstants.SYSTEM_SCHEMA_READER_ENABLED;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_LIST;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_FIELD_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_DISCOVER_URL_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP;
Expand Down Expand Up @@ -923,6 +924,20 @@ public void testGetVeniceWriter() {
}
}

@Test
public void testTargetRegionPushWithDeferredSwapSettings() {
Properties props = getVpjRequiredProperties();
props.put(KEY_FIELD_PROP, "id");
props.put(VALUE_FIELD_PROP, "name");
props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true);

try (VenicePushJob pushJob = getSpyVenicePushJob(props, getClient())) {
PushJobSetting pushJobSetting = pushJob.getPushJobSetting();
Assert.assertEquals(pushJobSetting.deferVersionSwap, true);
Assert.assertEquals(pushJobSetting.isTargetedRegionPushEnabled, true);
}
}

private JobStatusQueryResponse mockJobStatusQuery() {
JobStatusQueryResponse response = new JobStatusQueryResponse();
response.setStatus(ExecutionStatus.COMPLETED.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,4 +348,13 @@ public void fixMissingFields() {
}
}
}

@Override
public void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported) {
for (StoreVersion version: storeVersionsSupplier.getForUpdate()) {
if (version.getNumber() == versionNumber) {
version.setIsDaVinciHeartBeatReported(reported);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,11 @@ public boolean getIsDavinciHeartbeatReported() {
return delegate.getIsDavinciHeartbeatReported();
}

@Override
public void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported) {
throw new UnsupportedOperationException();
}

@Override
public String toString() {
return this.delegate.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,4 +341,6 @@ static boolean isSystemStore(String storeName) {
void setIsDavinciHeartbeatReported(boolean isReported);

boolean getIsDavinciHeartbeatReported();

void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported);
majisourav99 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static com.linkedin.venice.vpj.VenicePushJobConstants.SEND_CONTROL_MESSAGES_DIRECTLY;
import static com.linkedin.venice.vpj.VenicePushJobConstants.SOURCE_KAFKA;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;

Expand Down Expand Up @@ -866,6 +867,52 @@ public void testTargetedRegionPushJobFullConsumptionForHybridStore() throws Exce
});
}

@Test(timeOut = TEST_TIMEOUT)
public void testTargetRegionPushWithDeferredVersionSwap() throws Exception {
motherOfAllTests(
"testTargetRegionPushWithDeferredVersionSwap",
updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(1),
100,
(parentControllerClient, clusterName, storeName, props, inputDir) -> {
// start a regular push job
try (VenicePushJob job = new VenicePushJob("Test regular push job", props)) {
job.run();

// Verify the kafka URL being returned to the push job is the same as dc-0 kafka url.
Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());

TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
// Current version should become 1 all data centers
for (int version: parentControllerClient.getStore(storeName)
.getStore()
.getColoToCurrentVersions()
.values()) {
Assert.assertEquals(version, 1);
}
});
}

// start target version push with deferred swap
props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true);
try (VenicePushJob job = new VenicePushJob("Test target region push w. deferred version swap", props)) {
job.run();

TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
if (colo.equals(DEFAULT_NATIVE_REPLICATION_SOURCE)) {
Assert.assertEquals((int) version, 2); // Version should only be swapped in dc-0
} else {
Assert.assertEquals((int) version, 1); // The remaining regions shouldn't be swapped
}
});
});
}
});
}

private interface NativeReplicationTest {
void run(
ControllerClient parentControllerClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1986,12 +1986,39 @@ private void checkPreConditionForSingleVersionDeletion(
}
}

@Override
public void addVersionAndStartIngestion(
String clusterName,
String storeName,
String pushJobId,
int versionNumber,
int numberOfPartitions,
PushType pushType,
String remoteKafkaBootstrapServers,
long rewindTimeInSecondsOverride,
int replicationMetadataVersionId,
boolean versionSwapDeferred,
int repushSourceVersion) {
addVersionAndStartIngestion(
clusterName,
storeName,
pushJobId,
versionNumber,
numberOfPartitions,
pushType,
remoteKafkaBootstrapServers,
rewindTimeInSecondsOverride,
replicationMetadataVersionId,
versionSwapDeferred,
null,
repushSourceVersion);
}

/**
* This is a wrapper for VeniceHelixAdmin#addVersion but performs additional operations needed for add version invoked
* from the admin channel. Therefore, this method is mainly invoked from the admin task upon processing an add
* version message.
*/
@Override
public void addVersionAndStartIngestion(
String clusterName,
String storeName,
Expand All @@ -2003,6 +2030,7 @@ public void addVersionAndStartIngestion(
long rewindTimeInSecondsOverride,
int replicationMetadataVersionId,
boolean versionSwapDeferred,
String targetedRegions,
int repushSourceVersion) {
Store store = getStore(clusterName, storeName);
if (store == null) {
Expand Down Expand Up @@ -2053,6 +2081,7 @@ public void addVersionAndStartIngestion(
replicationMetadataVersionId,
Optional.empty(),
versionSwapDeferred,
targetedRegions,
repushSourceVersion);
}

Expand Down Expand Up @@ -2804,6 +2833,10 @@ private Pair<Boolean, Version> addVersion(
version.setRepushSourceVersion(repushSourceVersion);
}

if (versionSwapDeferred && StringUtils.isNotEmpty(targetedRegions)) {
version.setTargetSwapRegion(targetedRegions);
}

Properties veniceViewProperties = new Properties();
veniceViewProperties.put(PARTITION_COUNT, numberOfPartitions);
veniceViewProperties.put(USE_FAST_KAFKA_OPERATION_TIMEOUT, useFastKafkaOperationTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,9 @@ private void handleAddVersion(AddVersion message) {
} else {
boolean skipConsumption = message.targetedRegions != null && !message.targetedRegions.isEmpty()
&& message.targetedRegions.stream().map(Object::toString).noneMatch(regionName::equals);
if (skipConsumption) {
boolean isTargetRegionPushWithDeferredSwap = message.targetedRegions != null && message.versionSwapDeferred;
String targetedRegions = message.targetedRegions != null ? String.join(",", message.targetedRegions) : "";
if (skipConsumption && !isTargetRegionPushWithDeferredSwap) {
// for targeted region push, only allow specified region to process add version message
LOGGER.info(
"Skip the add version message for store {} in region {} since this is targeted region push and "
Expand All @@ -687,6 +689,7 @@ private void handleAddVersion(AddVersion message) {
rewindTimeInSecondsOverride,
replicationMetadataVersionId,
message.versionSwapDeferred,
targetedRegions,
repushSourceVersion);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
Expand Down Expand Up @@ -88,6 +89,7 @@ public abstract class AbstractPushMonitor
private final boolean isOfflinePushMonitorDaVinciPushStatusEnabled;

private final DisabledPartitionStats disabledPartitionStats;
private final String regionName;

public AbstractPushMonitor(
String clusterName,
Expand Down Expand Up @@ -134,6 +136,7 @@ public AbstractPushMonitor(
controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio(),
controllerConfig.useDaVinciSpecificExecutionStatusForError());
this.isOfflinePushMonitorDaVinciPushStatusEnabled = controllerConfig.isDaVinciPushStatusEnabled();
this.regionName = controllerConfig.getRegionName();
pushStatusCollector.start();
}

Expand Down Expand Up @@ -1120,15 +1123,37 @@ private void updateStoreVersionStatus(String storeName, int versionNumber, Versi
storeName,
versionNumber));
}
if (version.isVersionSwapDeferred()) {

/**
* Switch to the new version if:
* 1.deferred version swap is not enabled and it is not a target region push w/ deferred version swap
* 2.target region push w/ deferred swap is enabled and the current region matches the target region
*
* Do not switch to the new version now if:
* 1.deferred version swap is enabled (it will be manually swapped at a later date)
* 2.target region push is enabled and the current region does NOT match the target region (it will be swapped
* after targetSwapRegionWaitTime passes)
*/
Set<String> targetRegions = RegionUtils.parseRegionsFilterList(version.getTargetSwapRegion());
boolean isValidTargetRegion = targetRegions.contains(regionName);
boolean isTargetRegionPushWithDeferredSwap = version.isVersionSwapDeferred() && isValidTargetRegion;
misyel marked this conversation as resolved.
Show resolved Hide resolved
boolean isNormalPush = !version.isVersionSwapDeferred();
boolean isDeferredSwap = version.isVersionSwapDeferred() && targetRegions.isEmpty();
if (isTargetRegionPushWithDeferredSwap || isNormalPush) {
int previousVersion = store.getCurrentVersion();
misyel marked this conversation as resolved.
Show resolved Hide resolved
store.setCurrentVersion(versionNumber);
realTimeTopicSwitcher.transmitVersionSwapMessage(store, previousVersion, versionNumber);
} else if (isDeferredSwap) {
LOGGER.info(
"Version swap is deferred for store {} on version {}. Skipping version swap.",
store.getName(),
versionNumber);
} else {
int previousVersion = store.getCurrentVersion();
store.setCurrentVersion(versionNumber);
realTimeTopicSwitcher.transmitVersionSwapMessage(store, previousVersion, versionNumber);
LOGGER.info(
"Version swap is deferred for store {} on version {} in region {} due to target region push w/ deferred swap",
misyel marked this conversation as resolved.
Show resolved Hide resolved
store.getName(),
versionNumber,
regionName);
}
} else {
LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ private void scanDaVinciPushStatus() {
} else {
topicToNoDaVinciStatusRetryCountMap.remove(pushStatus.topicName);
}

// If there is some status for a davinci push, mark that there is a dvc heartbeat reported for the latest version
String storeName = Version.parseStoreFromKafkaTopicName(pushStatus.topicName);
Store store = storeRepository.getStore(storeName);
if (!store.getIsDavinciHeartbeatReported() && daVinciStatus.getStatus() != ExecutionStatus.NOT_CREATED) {
int versionNum = Version.parseVersionFromVersionTopicName(pushStatus.topicName);
store.updateVersionForDaVinciHeartbeat(versionNum, true);
storeRepository.updateStore(store);
}

LOGGER.info(
"Received DaVinci status: {} with details: {} for topic: {}",
daVinciStatus.getStatus(),
Expand Down
Loading
Loading