diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java index 487a899ca7b..d8ff38da583 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java @@ -78,6 +78,7 @@ public class PushJobSetting implements Serializable { public boolean d2Routing; public String targetedRegions; public boolean isTargetedRegionPushEnabled; + public boolean isTargetRegionPushWithDeferredSwapEnabled; public boolean isSystemSchemaReaderEnabled; public String systemSchemaClusterD2ServiceName; public String systemSchemaClusterD2ZKHost; diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index ad3cc4e97a4..660f2c0b234 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -76,6 +76,7 @@ import static com.linkedin.venice.vpj.VenicePushJobConstants.SYSTEM_SCHEMA_READER_ENABLED; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_LIST; +import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP; import static com.linkedin.venice.vpj.VenicePushJobConstants.TEMP_DIR_PREFIX; import static com.linkedin.venice.vpj.VenicePushJobConstants.UNCREATED_VERSION_NUMBER; import static com.linkedin.venice.vpj.VenicePushJobConstants.USE_MAPPER_TO_BUILD_DICTIONARY; @@ -399,8 +400,24 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { if (pushJobSettingToReturn.isIncrementalPush && pushJobSettingToReturn.isTargetedRegionPushEnabled) { throw new VeniceException("Incremental push is not supported while using targeted region push mode"); } + + // If target region push with deferred version swap is enabled, enable deferVersionSwap and + // isTargetedRegionPushEnabled + if (props.getBoolean(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, false)) { + pushJobSettingToReturn.deferVersionSwap = true; + pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled = true; + } + + if (pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled + && pushJobSettingToReturn.isTargetedRegionPushEnabled) { + throw new VeniceException( + "Target region push and target region push with deferred version swap cannot be enabled" + + " at the same time"); + } + if (props.containsKey(TARGETED_REGION_PUSH_LIST)) { - if (pushJobSettingToReturn.isTargetedRegionPushEnabled) { + if (pushJobSettingToReturn.isTargetedRegionPushEnabled + || pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled) { pushJobSettingToReturn.targetedRegions = props.getString(TARGETED_REGION_PUSH_LIST); } else { throw new VeniceException("Targeted region push list is only supported when targeted region push is enabled"); @@ -844,14 +861,14 @@ public void run() { updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.JOB_STATUS_POLLING_COMPLETED); // Do not mark completed yet as for target region push it will be marked inside postValidationConsumption - if (!pushJobSetting.isTargetedRegionPushEnabled) { + if (!pushJobSetting.isTargetedRegionPushEnabled && !pushJobSetting.isTargetRegionPushWithDeferredSwapEnabled) { pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.COMPLETED.getValue())); } pushJobDetails.jobDurationInMs = LatencyUtils.getElapsedTimeFromMsToMs(pushJobSetting.jobStartTimeMs); sendPushJobDetailsToController(); // only kick off the validation and post-validation flow when everything has to be done in a single VPJ - if (!pushJobSetting.isTargetedRegionPushEnabled) { + if (!pushJobSetting.isTargetedRegionPushEnabled || pushJobSetting.isTargetRegionPushWithDeferredSwapEnabled) { return; } @@ -2075,14 +2092,17 @@ private void validateStoreSettingAndPopulate(ControllerClient controllerClient, jobSetting.storeStorageQuota = storeResponse.getStore().getStorageQuotaInByte(); // Do not enable for deferred swap or hybrid store - if (pushJobSetting.deferVersionSwap || storeResponse.getStore().getHybridStoreConfig() != null) { + boolean isDeferredSwap = + pushJobSetting.deferVersionSwap && !pushJobSetting.isTargetRegionPushWithDeferredSwapEnabled; + if (isDeferredSwap || storeResponse.getStore().getHybridStoreConfig() != null) { LOGGER.warn( "target region is not available for {} as it hybrid or deferred version swap enabled.", jobSetting.storeName); jobSetting.isTargetedRegionPushEnabled = false; } - if (jobSetting.isTargetedRegionPushEnabled && jobSetting.targetedRegions == null) { + if ((jobSetting.isTargetedRegionPushEnabled || jobSetting.isTargetRegionPushWithDeferredSwapEnabled) + && jobSetting.targetedRegions == null) { // only override the targeted regions if it is not set and it is a single region push // use source grid fabric as target region to reduce data hop, else use default NR source if (!StringUtils.isEmpty(jobSetting.sourceGridFabric)) { diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java index 3e172755f57..093b7842319 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java @@ -324,6 +324,15 @@ private VenicePushJobConstants() { */ public static final String TARGETED_REGION_PUSH_LIST = "targeted.region.push.list"; + /** + * Config to enable target region push with deferred version swap + * In this mode, the VPJ will push data to all regions and only switch to the new version in a single region. + * The single region is decided by the store config in {@link StoreInfo#getNativeReplicationSourceFabric()}} unless + * a list of regions is passed in from targeted.region.push.list + * After a specified wait time (default 1h), the remaining regions will switch to the new version. + */ + public static final String TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP = "targeted.region.push.with.deferred.swap"; + public static final boolean DEFAULT_IS_DUPLICATED_KEY_ALLOWED = false; /** diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index c5161eb82a1..083606476ef 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -27,6 +27,7 @@ import static com.linkedin.venice.vpj.VenicePushJobConstants.SYSTEM_SCHEMA_READER_ENABLED; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_LIST; +import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP; import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_FIELD_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_DISCOVER_URL_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP; @@ -923,6 +924,36 @@ public void testGetVeniceWriter() { } } + @Test + public void testTargetRegionPushWithDeferredSwapSettings() { + Properties props = getVpjRequiredProperties(); + String regions = "test1, test2"; + props.put(KEY_FIELD_PROP, "id"); + props.put(VALUE_FIELD_PROP, "name"); + props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true); + props.put(TARGETED_REGION_PUSH_LIST, regions); + + try (VenicePushJob pushJob = getSpyVenicePushJob(props, getClient())) { + PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); + Assert.assertEquals(pushJobSetting.deferVersionSwap, true); + Assert.assertEquals(pushJobSetting.isTargetRegionPushWithDeferredSwapEnabled, true); + Assert.assertEquals(pushJobSetting.targetedRegions, regions); + } + } + + @Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = ".*cannot be enabled at the same time.*") + public void testEnableBothTargetRegionConfigs() { + Properties props = getVpjRequiredProperties(); + String regions = "test1, test2"; + props.put(KEY_FIELD_PROP, "id"); + props.put(VALUE_FIELD_PROP, "name"); + props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true); + props.put(TARGETED_REGION_PUSH_ENABLED, true); + props.put(TARGETED_REGION_PUSH_LIST, regions); + + getSpyVenicePushJob(props, getClient()); + } + private JobStatusQueryResponse mockJobStatusQuery() { JobStatusQueryResponse response = new JobStatusQueryResponse(); response.setStatus(ExecutionStatus.COMPLETED.toString()); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java index 9c4e56a5082..b786655561e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java @@ -348,4 +348,17 @@ public void fixMissingFields() { } } } + + @Override + public void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported) { + checkVersionSupplier(); + + for (StoreVersion storeVersion: storeVersionsSupplier.getForUpdate()) { + Version version = new VersionImpl(storeVersion); + if (version.getNumber() == versionNumber) { + version.setIsDavinciHeartbeatReported(reported); + return; + } + } + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java index 5271e879152..c531e7d2b8b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java @@ -1497,6 +1497,11 @@ public boolean getIsDavinciHeartbeatReported() { return delegate.getIsDavinciHeartbeatReported(); } + @Override + public void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported) { + throw new UnsupportedOperationException(); + } + @Override public String toString() { return this.delegate.toString(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java index 8b0c5b0710a..03a4b7f9261 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java @@ -341,4 +341,6 @@ static boolean isSystemStore(String storeName) { void setIsDavinciHeartbeatReported(boolean isReported); boolean getIsDavinciHeartbeatReported(); + + void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java index 37a4a3c638f..6406c3a4027 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java @@ -507,6 +507,7 @@ public Version cloneVersion() { clonedVersion.setBlobTransferEnabled(isBlobTransferEnabled()); clonedVersion.setTargetSwapRegion(getTargetSwapRegion()); clonedVersion.setTargetSwapRegionWaitTime(getTargetSwapRegionWaitTime()); + clonedVersion.setIsDavinciHeartbeatReported(getIsDavinciHeartbeatReported()); return clonedVersion; } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java b/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java index 7f58f92dd28..75d1dd03d16 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java @@ -436,4 +436,13 @@ public void testStoreLevelAcl() { 1); Assert.assertTrue(store.isAccessControlled()); } + + @Test + public void testUpdateVersionForDaVinciHeartbeat() { + String storeName = "testUpdateVersionForDaVinciHeartbeat"; + Store store = TestUtils.createTestStore(storeName, "owner", System.currentTimeMillis()); + store.addVersion(new VersionImpl(storeName, 1, "test")); + store.updateVersionForDaVinciHeartbeat(1, true); + Assert.assertTrue(store.getVersion(1).getIsDavinciHeartbeatReported()); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 5714633ec95..ed88c956ea7 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -75,6 +75,7 @@ import com.linkedin.venice.integration.utils.VeniceRouterWrapper; import com.linkedin.venice.meta.IngestionMetadataUpdateType; import com.linkedin.venice.meta.IngestionMode; +import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.ConstantVenicePartitioner; @@ -1547,6 +1548,35 @@ public void testDVCSnapshotGeneration(boolean useDiskStorage) throws Exception { } } + @Test(timeOut = TEST_TIMEOUT) + public void testIsDavinciHeartbeatReported() throws Exception { + String storeName = Utils.getUniqueString("testIsDavinviHeartbeatReported"); + Consumer paramsConsumer = params -> params.setTargetRegionSwap("test"); + setUpStore(storeName, paramsConsumer, properties -> {}, true); + + try (DaVinciClient client = ServiceFactory.getGenericAvroDaVinciClient(storeName, cluster)) { + client.subscribeAll().get(); + } + + File inputDirectory = getTempDataDirectory(); + String inputDirectoryPath = "file://" + inputDirectory.getAbsolutePath(); + try { + writeSimpleAvroFileWithIntToStringSchema(inputDirectory); + } catch (IOException e) { + throw new RuntimeException(e); + } + Properties vpjProperties = defaultVPJProps(cluster, inputDirectoryPath, storeName); + runVPJ(vpjProperties, 2, cluster); + + try (ControllerClient controllerClient = cluster.getControllerClient()) { + StoreInfo store = controllerClient.getStore(storeName).getStore(); + TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MILLISECONDS, () -> { + Assert.assertTrue(store.getIsDavinciHeartbeatReported()); + Assert.assertTrue(store.getVersion(2).get().getIsDavinciHeartbeatReported()); + }); + } + } + private void setupHybridStore(String storeName, Consumer paramsConsumer) throws Exception { setupHybridStore(storeName, paramsConsumer, KEY_COUNT); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java index c3540d09161..a8c282b2a5e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java @@ -43,6 +43,7 @@ import static com.linkedin.venice.vpj.VenicePushJobConstants.SEND_CONTROL_MESSAGES_DIRECTLY; import static com.linkedin.venice.vpj.VenicePushJobConstants.SOURCE_KAFKA; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED; +import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -866,6 +867,52 @@ public void testTargetedRegionPushJobFullConsumptionForHybridStore() throws Exce }); } + @Test(timeOut = TEST_TIMEOUT) + public void testTargetRegionPushWithDeferredVersionSwap() throws Exception { + motherOfAllTests( + "testTargetRegionPushWithDeferredVersionSwap", + updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(1), + 100, + (parentControllerClient, clusterName, storeName, props, inputDir) -> { + // start a regular push job + try (VenicePushJob job = new VenicePushJob("Test regular push job", props)) { + job.run(); + + // Verify the kafka URL being returned to the push job is the same as dc-0 kafka url. + Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(0).getKafkaBrokerWrapper().getAddress()); + + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + // Current version should become 1 all data centers + for (int version: parentControllerClient.getStore(storeName) + .getStore() + .getColoToCurrentVersions() + .values()) { + Assert.assertEquals(version, 1); + } + }); + } + + // start target version push with deferred swap + props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true); + try (VenicePushJob job = new VenicePushJob("Test target region push w. deferred version swap", props)) { + job.run(); + + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + Map coloVersions = + parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions(); + + coloVersions.forEach((colo, version) -> { + if (colo.equals(DEFAULT_NATIVE_REPLICATION_SOURCE)) { + Assert.assertEquals((int) version, 2); // Version should only be swapped in dc-0 + } else { + Assert.assertEquals((int) version, 1); // The remaining regions shouldn't be swapped + } + }); + }); + } + }); + } + private interface NativeReplicationTest { void run( ControllerClient parentControllerClient, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 963aa9c9e7a..13881f873a7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -1986,12 +1986,39 @@ private void checkPreConditionForSingleVersionDeletion( } } + @Override + public void addVersionAndStartIngestion( + String clusterName, + String storeName, + String pushJobId, + int versionNumber, + int numberOfPartitions, + PushType pushType, + String remoteKafkaBootstrapServers, + long rewindTimeInSecondsOverride, + int replicationMetadataVersionId, + boolean versionSwapDeferred, + int repushSourceVersion) { + addVersionAndStartIngestion( + clusterName, + storeName, + pushJobId, + versionNumber, + numberOfPartitions, + pushType, + remoteKafkaBootstrapServers, + rewindTimeInSecondsOverride, + replicationMetadataVersionId, + versionSwapDeferred, + null, + repushSourceVersion); + } + /** * This is a wrapper for VeniceHelixAdmin#addVersion but performs additional operations needed for add version invoked * from the admin channel. Therefore, this method is mainly invoked from the admin task upon processing an add * version message. */ - @Override public void addVersionAndStartIngestion( String clusterName, String storeName, @@ -2003,6 +2030,7 @@ public void addVersionAndStartIngestion( long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, + String targetedRegions, int repushSourceVersion) { Store store = getStore(clusterName, storeName); if (store == null) { @@ -2053,6 +2081,7 @@ public void addVersionAndStartIngestion( replicationMetadataVersionId, Optional.empty(), versionSwapDeferred, + targetedRegions, repushSourceVersion); } @@ -2804,6 +2833,10 @@ private Pair addVersion( version.setRepushSourceVersion(repushSourceVersion); } + if (versionSwapDeferred && StringUtils.isNotEmpty(targetedRegions)) { + version.setTargetSwapRegion(targetedRegions); + } + Properties veniceViewProperties = new Properties(); veniceViewProperties.put(PARTITION_COUNT, numberOfPartitions); veniceViewProperties.put(USE_FAST_KAFKA_OPERATION_TIMEOUT, useFastKafkaOperationTimeout); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java index 65422002e3b..10f7a3cadb7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java @@ -666,7 +666,9 @@ private void handleAddVersion(AddVersion message) { } else { boolean skipConsumption = message.targetedRegions != null && !message.targetedRegions.isEmpty() && message.targetedRegions.stream().map(Object::toString).noneMatch(regionName::equals); - if (skipConsumption) { + boolean isTargetRegionPushWithDeferredSwap = message.targetedRegions != null && message.versionSwapDeferred; + String targetedRegions = message.targetedRegions != null ? String.join(",", message.targetedRegions) : ""; + if (skipConsumption && !isTargetRegionPushWithDeferredSwap) { // for targeted region push, only allow specified region to process add version message LOGGER.info( "Skip the add version message for store {} in region {} since this is targeted region push and " @@ -687,6 +689,7 @@ private void handleAddVersion(AddVersion message) { rewindTimeInSecondsOverride, replicationMetadataVersionId, message.versionSwapDeferred, + targetedRegions, repushSourceVersion); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index 9a5a78e61f0..30a6be4d424 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -31,6 +31,7 @@ import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.throttle.EventThrottler; import com.linkedin.venice.utils.HelixUtils; +import com.linkedin.venice.utils.RegionUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; @@ -88,6 +89,7 @@ public abstract class AbstractPushMonitor private final boolean isOfflinePushMonitorDaVinciPushStatusEnabled; private final DisabledPartitionStats disabledPartitionStats; + private final String regionName; public AbstractPushMonitor( String clusterName, @@ -134,6 +136,7 @@ public AbstractPushMonitor( controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio(), controllerConfig.useDaVinciSpecificExecutionStatusForError()); this.isOfflinePushMonitorDaVinciPushStatusEnabled = controllerConfig.isDaVinciPushStatusEnabled(); + this.regionName = controllerConfig.getRegionName(); pushStatusCollector.start(); } @@ -1120,15 +1123,41 @@ private void updateStoreVersionStatus(String storeName, int versionNumber, Versi storeName, versionNumber)); } - if (version.isVersionSwapDeferred()) { + + /** + * Switch to the new version if: + * 1.deferred version swap is not enabled and it is not a target region push w/ deferred version swap + * 2.target region push w/ deferred swap is enabled and the current region matches the target region + * + * Do not switch to the new version now if: + * 1.deferred version swap is enabled (it will be manually swapped at a later date) + * 2.target region push is enabled and the current region does NOT match the target region (it will be swapped + * after targetSwapRegionWaitTime passes) + */ + Set targetRegions = RegionUtils.parseRegionsFilterList(version.getTargetSwapRegion()); + boolean isTargetRegionPushWithDeferredSwap = + version.isVersionSwapDeferred() && targetRegions.contains(regionName); + boolean isNormalPush = !version.isVersionSwapDeferred(); + boolean isDeferredSwap = version.isVersionSwapDeferred() && targetRegions.isEmpty(); + if (isTargetRegionPushWithDeferredSwap || isNormalPush) { LOGGER.info( - "Version swap is deferred for store {} on version {}. Skipping version swap.", + "Swapping to version {} for store {} in region {} during " + + (isNormalPush ? "normal push" : "target region push with deferred version swap"), + versionNumber, store.getName(), - versionNumber); - } else { + regionName, + isTargetRegionPushWithDeferredSwap, + isNormalPush); int previousVersion = store.getCurrentVersion(); store.setCurrentVersion(versionNumber); realTimeTopicSwitcher.transmitVersionSwapMessage(store, previousVersion, versionNumber); + } else { + LOGGER.info( + "Version swap is deferred for store {} on version {} in region {} because " + + (isDeferredSwap ? "deferred version swap is enabled" : "it is not in the target regions"), + store.getName(), + versionNumber, + regionName); } } else { LOGGER.info( diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java index 7faa595a544..c1981529ac5 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java @@ -171,6 +171,17 @@ private void scanDaVinciPushStatus() { } else { topicToNoDaVinciStatusRetryCountMap.remove(pushStatus.topicName); } + + // If there is some status for a davinci push, mark that there is a dvc heartbeat reported for the latest version + String storeName = Version.parseStoreFromKafkaTopicName(pushStatus.topicName); + Store store = storeRepository.getStore(storeName); + if (!store.getIsDavinciHeartbeatReported() && daVinciStatus.getStatus() != ExecutionStatus.NOT_CREATED) { + int versionNum = Version.parseVersionFromVersionTopicName(pushStatus.topicName); + store.updateVersionForDaVinciHeartbeat(versionNum, true); + store.setIsDavinciHeartbeatReported(true); + storeRepository.updateStore(store); + } + LOGGER.info( "Received DaVinci status: {} with details: {} for topic: {}", daVinciStatus.getStatus(), diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java index be38cb0311b..3c28e99c282 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java @@ -1162,6 +1162,7 @@ public void testResubscribeWithFailedAdminMessages() throws ExecutionException, -1, 1, false, + "", 0); // isLeaderController() is called once every consumption cycle (1000ms) and for every message processed in // AdminExecutionTask. @@ -1319,6 +1320,7 @@ public void testRetriableConsumptionException() -1, 1, false, + "", 0); Future future = veniceWriter.put( emptyKeyBytes, @@ -1425,17 +1427,41 @@ public void testAddVersionMsgHandlingForTargetedRegionPush() throws Exception { // dc-0 is the default region for the testing suite so the message for "dc-1" and "dc-2" should be ignored. veniceWriter.put( emptyKeyBytes, - getAddVersionMessage(clusterName, storeName, mockPushJobId, versionNumber, numberOfPartitions, 1L, "dc-1"), + getAddVersionMessage( + clusterName, + storeName, + mockPushJobId, + versionNumber, + numberOfPartitions, + 1L, + "dc-1", + false), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); veniceWriter.put( emptyKeyBytes, - getAddVersionMessage(clusterName, storeName, mockPushJobId, versionNumber, numberOfPartitions, 2L, "dc-2"), + getAddVersionMessage( + clusterName, + storeName, + mockPushJobId, + versionNumber, + numberOfPartitions, + 2L, + "dc-2", + false), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); veniceWriter.put( emptyKeyBytes, - getAddVersionMessage(clusterName, storeName, mockPushJobId, versionNumber, numberOfPartitions, 3L, "dc-0"), + getAddVersionMessage( + clusterName, + storeName, + mockPushJobId, + versionNumber, + numberOfPartitions, + 3L, + "dc-0", + false), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> { @@ -1450,6 +1476,52 @@ public void testAddVersionMsgHandlingForTargetedRegionPush() throws Exception { -1, 1, false, + "dc-0", + 0); + }); + + task.close(); + executor.shutdown(); + executor.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS); + } + + @Test + public void testAddVersionMsgHandlingForTargetedRegionPushWithDeferredSwap() throws Exception { + AdminConsumptionStats stats = mock(AdminConsumptionStats.class); + AdminConsumptionTask task = getAdminConsumptionTask(new RandomPollStrategy(), false, stats, 10000); + executor.submit(task); + String mockPushJobId = "testAddVersionMsgHandlingForTargetedRegionPushWithDeferredSwap"; + int versionNumber = 1; + int numberOfPartitions = 1; + + veniceWriter.put( + emptyKeyBytes, + getAddVersionMessage( + clusterName, + storeName, + mockPushJobId, + versionNumber, + numberOfPartitions, + 1L, + "dc-1", + true), + AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); + + // dc-0 is the default region, but with target region swap w/ deferred swap enabled, dc-1 should still ingest the + // data + TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> { + verify(admin, times(1)).addVersionAndStartIngestion( + clusterName, + storeName, + mockPushJobId, + versionNumber, + numberOfPartitions, + Version.PushType.BATCH, + null, + -1, + 1, + true, + "dc-1", 0); }); @@ -1500,7 +1572,15 @@ private byte[] getAddVersionMessage( int versionNum, int numberOfPartitions, long executionId) { - return getAddVersionMessage(clusterName, storeName, pushJobId, versionNum, numberOfPartitions, executionId, null); + return getAddVersionMessage( + clusterName, + storeName, + pushJobId, + versionNum, + numberOfPartitions, + executionId, + null, + false); } private byte[] getAddVersionMessage( @@ -1510,7 +1590,8 @@ private byte[] getAddVersionMessage( int versionNum, int numberOfPartitions, long executionId, - String targetedRegions) { + String targetedRegions, + boolean deferredSwap) { AddVersion addVersion = (AddVersion) AdminMessageType.ADD_VERSION.getNewInstance(); addVersion.clusterName = clusterName; addVersion.storeName = storeName; @@ -1519,6 +1600,7 @@ private byte[] getAddVersionMessage( addVersion.numberOfPartitions = numberOfPartitions; addVersion.rewindTimeInSecondsOverride = -1; addVersion.timestampMetadataVersionId = 1; + addVersion.versionSwapDeferred = deferredSwap; if (targetedRegions != null) { addVersion.targetedRegions = new ArrayList<>(RegionUtils.parseRegionsFilterList(targetedRegions)); }