Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vpj][controller] Add deferred swap for target region push #1375

Merged
merged 20 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class PushJobSetting implements Serializable {
public boolean d2Routing;
public String targetedRegions;
public boolean isTargetedRegionPushEnabled;
public boolean isTargetRegionPushWithDeferredSwapEnabled;
public boolean isSystemSchemaReaderEnabled;
public String systemSchemaClusterD2ServiceName;
public String systemSchemaClusterD2ZKHost;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static com.linkedin.venice.vpj.VenicePushJobConstants.SYSTEM_SCHEMA_READER_ENABLED;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_LIST;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TEMP_DIR_PREFIX;
import static com.linkedin.venice.vpj.VenicePushJobConstants.UNCREATED_VERSION_NUMBER;
import static com.linkedin.venice.vpj.VenicePushJobConstants.USE_MAPPER_TO_BUILD_DICTIONARY;
Expand Down Expand Up @@ -399,6 +400,15 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) {
if (pushJobSettingToReturn.isIncrementalPush && pushJobSettingToReturn.isTargetedRegionPushEnabled) {
throw new VeniceException("Incremental push is not supported while using targeted region push mode");
}

// If target region push with deferred version swap is enabled, enable deferVersionSwap and
// isTargetedRegionPushEnabled
if (props.getBoolean(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, false)) {
pushJobSettingToReturn.deferVersionSwap = true;
pushJobSettingToReturn.isTargetedRegionPushEnabled = true;
pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled = true;
}

if (props.containsKey(TARGETED_REGION_PUSH_LIST)) {
if (pushJobSettingToReturn.isTargetedRegionPushEnabled) {
pushJobSettingToReturn.targetedRegions = props.getString(TARGETED_REGION_PUSH_LIST);
Expand Down Expand Up @@ -851,7 +861,7 @@ public void run() {
sendPushJobDetailsToController();

// only kick off the validation and post-validation flow when everything has to be done in a single VPJ
if (!pushJobSetting.isTargetedRegionPushEnabled) {
if (!pushJobSetting.isTargetedRegionPushEnabled || pushJobSetting.isTargetRegionPushWithDeferredSwapEnabled) {
return;
}

Expand Down Expand Up @@ -2075,7 +2085,8 @@ private void validateStoreSettingAndPopulate(ControllerClient controllerClient,
jobSetting.storeStorageQuota = storeResponse.getStore().getStorageQuotaInByte();

// Do not enable for deferred swap or hybrid store
if (pushJobSetting.deferVersionSwap || storeResponse.getStore().getHybridStoreConfig() != null) {
boolean isDeferredSwap = pushJobSetting.deferVersionSwap && !pushJobSetting.isTargetedRegionPushEnabled;
if (isDeferredSwap || storeResponse.getStore().getHybridStoreConfig() != null) {
LOGGER.warn(
"target region is not available for {} as it hybrid or deferred version swap enabled.",
jobSetting.storeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ private VenicePushJobConstants() {
*/
public static final String TARGETED_REGION_PUSH_LIST = "targeted.region.push.list";

/**
* Config to enable target region push with deferred version swap
* In this mode, the VPJ will push data to all regions and only switch to the new version in a single region.
* The single region is decided by the store config in {@link StoreInfo#getNativeReplicationSourceFabric()}} unless
* a list of regions is passed in from targeted.region.push.list
* After a specified wait time (default 1h), the remaining regions will switch to the new version.
*/
public static final String TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP = "targeted.region.push.with.deferred.swap";

public static final boolean DEFAULT_IS_DUPLICATED_KEY_ALLOWED = false;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.linkedin.venice.vpj.VenicePushJobConstants.SYSTEM_SCHEMA_READER_ENABLED;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_LIST;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_FIELD_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_DISCOVER_URL_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP;
Expand Down Expand Up @@ -923,6 +924,23 @@ public void testGetVeniceWriter() {
}
}

@Test
public void testTargetRegionPushWithDeferredSwapSettings() {
Properties props = getVpjRequiredProperties();
String regions = "test1, test2";
props.put(KEY_FIELD_PROP, "id");
props.put(VALUE_FIELD_PROP, "name");
props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true);
props.put(TARGETED_REGION_PUSH_LIST, regions);

try (VenicePushJob pushJob = getSpyVenicePushJob(props, getClient())) {
PushJobSetting pushJobSetting = pushJob.getPushJobSetting();
Assert.assertEquals(pushJobSetting.deferVersionSwap, true);
Assert.assertEquals(pushJobSetting.isTargetedRegionPushEnabled, true);
Assert.assertEquals(pushJobSetting.targetedRegions, regions);
}
}

private JobStatusQueryResponse mockJobStatusQuery() {
JobStatusQueryResponse response = new JobStatusQueryResponse();
response.setStatus(ExecutionStatus.COMPLETED.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,4 +348,17 @@ public void fixMissingFields() {
}
}
}

@Override
public void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported) {
checkVersionSupplier();

for (StoreVersion storeVersion: storeVersionsSupplier.getForUpdate()) {
Version version = new VersionImpl(storeVersion);
if (version.getNumber() == versionNumber) {
version.setIsDavinciHeartbeatReported(reported);
return;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,11 @@ public boolean getIsDavinciHeartbeatReported() {
return delegate.getIsDavinciHeartbeatReported();
}

@Override
public void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported) {
throw new UnsupportedOperationException();
}

@Override
public String toString() {
return this.delegate.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,4 +341,6 @@ static boolean isSystemStore(String storeName) {
void setIsDavinciHeartbeatReported(boolean isReported);

boolean getIsDavinciHeartbeatReported();

void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported);
majisourav99 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ public Version cloneVersion() {
clonedVersion.setBlobTransferEnabled(isBlobTransferEnabled());
clonedVersion.setTargetSwapRegion(getTargetSwapRegion());
clonedVersion.setTargetSwapRegionWaitTime(getTargetSwapRegionWaitTime());
clonedVersion.setIsDavinciHeartbeatReported(getIsDavinciHeartbeatReported());
return clonedVersion;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,4 +436,13 @@ public void testStoreLevelAcl() {
1);
Assert.assertTrue(store.isAccessControlled());
}

@Test
public void testUpdateVersionForDaVinciHeartbeat() {
String storeName = "testUpdateVersionForDaVinciHeartbeat";
Store store = TestUtils.createTestStore(storeName, "owner", System.currentTimeMillis());
store.addVersion(new VersionImpl(storeName, 1, "test"));
store.updateVersionForDaVinciHeartbeat(1, true);
Assert.assertTrue(store.getVersion(1).getIsDavinciHeartbeatReported());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.meta.IngestionMetadataUpdateType;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.partitioner.ConstantVenicePartitioner;
Expand Down Expand Up @@ -1547,6 +1548,39 @@ public void testDVCSnapshotGeneration(boolean useDiskStorage) throws Exception {
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testIsDavinciHeartbeatReported() throws Exception {
String storeName = Utils.getUniqueString("testIsDavinviHeartbeatReported");
Consumer<UpdateStoreQueryParams> paramsConsumer = params -> params.setTargetRegionSwap("test");
setUpStore(storeName, paramsConsumer, properties -> {}, true);

try (DaVinciClient<Object, Object> client = ServiceFactory.getGenericAvroDaVinciClient(storeName, cluster)) {
client.subscribeAll().get();
}

File inputDirectory = getTempDataDirectory();
String inputDirectoryPath = "file://" + inputDirectory.getAbsolutePath();
try {
writeSimpleAvroFileWithIntToStringSchema(inputDirectory);
} catch (IOException e) {
throw new RuntimeException(e);
}
Properties vpjProperties = defaultVPJProps(cluster, inputDirectoryPath, storeName);
runVPJ(vpjProperties, 2, cluster);

try (ControllerClient controllerClient = cluster.getControllerClient()) {
StoreInfo store = controllerClient.getStore(storeName).getStore();
TestUtils.waitForNonDeterministicAssertion(
misyel marked this conversation as resolved.
Show resolved Hide resolved
2,
TimeUnit.MILLISECONDS,
() -> Assert.assertEquals(store.getIsDavinciHeartbeatReported(), true));
TestUtils.waitForNonDeterministicAssertion(
2,
TimeUnit.MILLISECONDS,
() -> Assert.assertEquals(store.getVersion(2).get().getIsDavinciHeartbeatReported(), true));
}
}

private void setupHybridStore(String storeName, Consumer<UpdateStoreQueryParams> paramsConsumer) throws Exception {
setupHybridStore(storeName, paramsConsumer, KEY_COUNT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static com.linkedin.venice.vpj.VenicePushJobConstants.SEND_CONTROL_MESSAGES_DIRECTLY;
import static com.linkedin.venice.vpj.VenicePushJobConstants.SOURCE_KAFKA;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;

Expand Down Expand Up @@ -866,6 +867,52 @@ public void testTargetedRegionPushJobFullConsumptionForHybridStore() throws Exce
});
}

@Test(timeOut = TEST_TIMEOUT)
public void testTargetRegionPushWithDeferredVersionSwap() throws Exception {
motherOfAllTests(
"testTargetRegionPushWithDeferredVersionSwap",
updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(1),
100,
(parentControllerClient, clusterName, storeName, props, inputDir) -> {
// start a regular push job
try (VenicePushJob job = new VenicePushJob("Test regular push job", props)) {
job.run();

// Verify the kafka URL being returned to the push job is the same as dc-0 kafka url.
Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());

TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
// Current version should become 1 all data centers
for (int version: parentControllerClient.getStore(storeName)
.getStore()
.getColoToCurrentVersions()
.values()) {
Assert.assertEquals(version, 1);
}
});
}

// start target version push with deferred swap
props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true);
try (VenicePushJob job = new VenicePushJob("Test target region push w. deferred version swap", props)) {
job.run();

TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
if (colo.equals(DEFAULT_NATIVE_REPLICATION_SOURCE)) {
Assert.assertEquals((int) version, 2); // Version should only be swapped in dc-0
} else {
Assert.assertEquals((int) version, 1); // The remaining regions shouldn't be swapped
}
});
});
}
});
}

private interface NativeReplicationTest {
void run(
ControllerClient parentControllerClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1986,12 +1986,39 @@ private void checkPreConditionForSingleVersionDeletion(
}
}

@Override
public void addVersionAndStartIngestion(
String clusterName,
String storeName,
String pushJobId,
int versionNumber,
int numberOfPartitions,
PushType pushType,
String remoteKafkaBootstrapServers,
long rewindTimeInSecondsOverride,
int replicationMetadataVersionId,
boolean versionSwapDeferred,
int repushSourceVersion) {
addVersionAndStartIngestion(
clusterName,
storeName,
pushJobId,
versionNumber,
numberOfPartitions,
pushType,
remoteKafkaBootstrapServers,
rewindTimeInSecondsOverride,
replicationMetadataVersionId,
versionSwapDeferred,
null,
repushSourceVersion);
}

/**
* This is a wrapper for VeniceHelixAdmin#addVersion but performs additional operations needed for add version invoked
* from the admin channel. Therefore, this method is mainly invoked from the admin task upon processing an add
* version message.
*/
@Override
public void addVersionAndStartIngestion(
String clusterName,
String storeName,
Expand All @@ -2003,6 +2030,7 @@ public void addVersionAndStartIngestion(
long rewindTimeInSecondsOverride,
int replicationMetadataVersionId,
boolean versionSwapDeferred,
String targetedRegions,
int repushSourceVersion) {
Store store = getStore(clusterName, storeName);
if (store == null) {
Expand Down Expand Up @@ -2053,6 +2081,7 @@ public void addVersionAndStartIngestion(
replicationMetadataVersionId,
Optional.empty(),
versionSwapDeferred,
targetedRegions,
repushSourceVersion);
}

Expand Down Expand Up @@ -2804,6 +2833,10 @@ private Pair<Boolean, Version> addVersion(
version.setRepushSourceVersion(repushSourceVersion);
}

if (versionSwapDeferred && StringUtils.isNotEmpty(targetedRegions)) {
version.setTargetSwapRegion(targetedRegions);
}

Properties veniceViewProperties = new Properties();
veniceViewProperties.put(PARTITION_COUNT, numberOfPartitions);
veniceViewProperties.put(USE_FAST_KAFKA_OPERATION_TIMEOUT, useFastKafkaOperationTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,9 @@ private void handleAddVersion(AddVersion message) {
} else {
boolean skipConsumption = message.targetedRegions != null && !message.targetedRegions.isEmpty()
&& message.targetedRegions.stream().map(Object::toString).noneMatch(regionName::equals);
if (skipConsumption) {
boolean isTargetRegionPushWithDeferredSwap = message.targetedRegions != null && message.versionSwapDeferred;
String targetedRegions = message.targetedRegions != null ? String.join(",", message.targetedRegions) : "";
if (skipConsumption && !isTargetRegionPushWithDeferredSwap) {
// for targeted region push, only allow specified region to process add version message
LOGGER.info(
"Skip the add version message for store {} in region {} since this is targeted region push and "
Expand All @@ -687,6 +689,7 @@ private void handleAddVersion(AddVersion message) {
rewindTimeInSecondsOverride,
replicationMetadataVersionId,
message.versionSwapDeferred,
targetedRegions,
repushSourceVersion);
}
}
Expand Down
Loading
Loading