From 6937992827d14c815264dd4e39f490e117ecaf08 Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Thu, 5 Dec 2024 15:22:32 -0800 Subject: [PATCH 01/20] add target region push w/ deferred version swap to push job --- .../linkedin/venice/hadoop/VenicePushJob.java | 14 +++- .../venice/vpj/VenicePushJobConstants.java | 8 ++ .../venice/hadoop/VenicePushJobTest.java | 15 ++++ .../TestPushJobWithNativeReplication.java | 47 +++++++++++ .../venice/controller/VeniceHelixAdmin.java | 79 ++++++++----------- .../kafka/consumer/AdminExecutionTask.java | 5 +- .../pushmonitor/AbstractPushMonitor.java | 33 +++++++- 7 files changed, 150 insertions(+), 51 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index ad3cc4e97a4..aa6c7342f8e 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -76,6 +76,7 @@ import static com.linkedin.venice.vpj.VenicePushJobConstants.SYSTEM_SCHEMA_READER_ENABLED; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_LIST; +import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP; import static com.linkedin.venice.vpj.VenicePushJobConstants.TEMP_DIR_PREFIX; import static com.linkedin.venice.vpj.VenicePushJobConstants.UNCREATED_VERSION_NUMBER; import static com.linkedin.venice.vpj.VenicePushJobConstants.USE_MAPPER_TO_BUILD_DICTIONARY; @@ -502,6 +503,13 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { pushJobSettingToReturn.dataWriterComputeJobClass = objectClass; } + // If target region push with deferred version swap is enabled, enable deferVersionSwap and + // isTargetedRegionPushEnabled + if (props.getBoolean(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, false)) { + pushJobSettingToReturn.deferVersionSwap = true; + pushJobSettingToReturn.isTargetedRegionPushEnabled = true; + } + return pushJobSettingToReturn; } @@ -851,7 +859,8 @@ public void run() { sendPushJobDetailsToController(); // only kick off the validation and post-validation flow when everything has to be done in a single VPJ - if (!pushJobSetting.isTargetedRegionPushEnabled) { + if (!pushJobSetting.isTargetedRegionPushEnabled + || (pushJobSetting.deferVersionSwap && pushJobSetting.isTargetedRegionPushEnabled)) { return; } @@ -2075,7 +2084,8 @@ private void validateStoreSettingAndPopulate(ControllerClient controllerClient, jobSetting.storeStorageQuota = storeResponse.getStore().getStorageQuotaInByte(); // Do not enable for deferred swap or hybrid store - if (pushJobSetting.deferVersionSwap || storeResponse.getStore().getHybridStoreConfig() != null) { + boolean isDeferredSwap = pushJobSetting.deferVersionSwap && !pushJobSetting.isTargetedRegionPushEnabled; + if (isDeferredSwap || storeResponse.getStore().getHybridStoreConfig() != null) { LOGGER.warn( "target region is not available for {} as it hybrid or deferred version swap enabled.", jobSetting.storeName); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java index 3e172755f57..994b0ca6071 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java @@ -324,6 +324,14 @@ private VenicePushJobConstants() { */ public static final String TARGETED_REGION_PUSH_LIST = "targeted.region.push.list"; + /** + * Config to enable target region push with deferred version swap + * In this mode, the VPJ will push data to all regions and only switch to the new version in a single region. + * The single region is decided by the store config in {@link StoreInfo#getNativeReplicationSourceFabric()}}. + * After a specified wait time (default 1h), the remaining regions will switch to the new version. + */ + public static final String TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP = "targeted.region.push.with.deferred.swap"; + public static final boolean DEFAULT_IS_DUPLICATED_KEY_ALLOWED = false; /** diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index c5161eb82a1..190cb15a85f 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -27,6 +27,7 @@ import static com.linkedin.venice.vpj.VenicePushJobConstants.SYSTEM_SCHEMA_READER_ENABLED; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_LIST; +import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP; import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_FIELD_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_DISCOVER_URL_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP; @@ -923,6 +924,20 @@ public void testGetVeniceWriter() { } } + @Test + public void testTargetRegionPushWithDeferredSwapSettings() { + Properties props = getVpjRequiredProperties(); + props.put(KEY_FIELD_PROP, "id"); + props.put(VALUE_FIELD_PROP, "name"); + props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true); + + try (VenicePushJob pushJob = getSpyVenicePushJob(props, getClient())) { + PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); + Assert.assertEquals(pushJobSetting.deferVersionSwap, true); + Assert.assertEquals(pushJobSetting.isTargetedRegionPushEnabled, true); + } + } + private JobStatusQueryResponse mockJobStatusQuery() { JobStatusQueryResponse response = new JobStatusQueryResponse(); response.setStatus(ExecutionStatus.COMPLETED.toString()); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java index c3540d09161..a8c282b2a5e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java @@ -43,6 +43,7 @@ import static com.linkedin.venice.vpj.VenicePushJobConstants.SEND_CONTROL_MESSAGES_DIRECTLY; import static com.linkedin.venice.vpj.VenicePushJobConstants.SOURCE_KAFKA; import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED; +import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -866,6 +867,52 @@ public void testTargetedRegionPushJobFullConsumptionForHybridStore() throws Exce }); } + @Test(timeOut = TEST_TIMEOUT) + public void testTargetRegionPushWithDeferredVersionSwap() throws Exception { + motherOfAllTests( + "testTargetRegionPushWithDeferredVersionSwap", + updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(1), + 100, + (parentControllerClient, clusterName, storeName, props, inputDir) -> { + // start a regular push job + try (VenicePushJob job = new VenicePushJob("Test regular push job", props)) { + job.run(); + + // Verify the kafka URL being returned to the push job is the same as dc-0 kafka url. + Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(0).getKafkaBrokerWrapper().getAddress()); + + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + // Current version should become 1 all data centers + for (int version: parentControllerClient.getStore(storeName) + .getStore() + .getColoToCurrentVersions() + .values()) { + Assert.assertEquals(version, 1); + } + }); + } + + // start target version push with deferred swap + props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true); + try (VenicePushJob job = new VenicePushJob("Test target region push w. deferred version swap", props)) { + job.run(); + + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + Map coloVersions = + parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions(); + + coloVersions.forEach((colo, version) -> { + if (colo.equals(DEFAULT_NATIVE_REPLICATION_SOURCE)) { + Assert.assertEquals((int) version, 2); // Version should only be swapped in dc-0 + } else { + Assert.assertEquals((int) version, 1); // The remaining regions shouldn't be swapped + } + }); + }); + } + }); + } + private interface NativeReplicationTest { void run( ControllerClient parentControllerClient, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 963aa9c9e7a..0bc54787256 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -1986,12 +1986,39 @@ private void checkPreConditionForSingleVersionDeletion( } } + @Override + public void addVersionAndStartIngestion( + String clusterName, + String storeName, + String pushJobId, + int versionNumber, + int numberOfPartitions, + PushType pushType, + String remoteKafkaBootstrapServers, + long rewindTimeInSecondsOverride, + int replicationMetadataVersionId, + boolean versionSwapDeferred, + int repushSourceVersion) { + addVersionAndStartIngestion( + clusterName, + storeName, + pushJobId, + versionNumber, + numberOfPartitions, + pushType, + remoteKafkaBootstrapServers, + rewindTimeInSecondsOverride, + replicationMetadataVersionId, + versionSwapDeferred, + null, + repushSourceVersion); + } + /** * This is a wrapper for VeniceHelixAdmin#addVersion but performs additional operations needed for add version invoked * from the admin channel. Therefore, this method is mainly invoked from the admin task upon processing an add * version message. */ - @Override public void addVersionAndStartIngestion( String clusterName, String storeName, @@ -2003,6 +2030,7 @@ public void addVersionAndStartIngestion( long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, + String targetedRegions, int repushSourceVersion) { Store store = getStore(clusterName, storeName); if (store == null) { @@ -2053,6 +2081,7 @@ public void addVersionAndStartIngestion( replicationMetadataVersionId, Optional.empty(), versionSwapDeferred, + targetedRegions, repushSourceVersion); } @@ -2490,49 +2519,6 @@ private void createBatchTopics( useFastKafkaOperationTimeout)); } - private Pair addVersion( - String clusterName, - String storeName, - String pushJobId, - int versionNumber, - int numberOfPartitions, - int replicationFactor, - boolean startIngestion, - boolean sendStartOfPush, - boolean sorted, - boolean useFastKafkaOperationTimeout, - PushType pushType, - String compressionDictionary, - String remoteKafkaBootstrapServers, - Optional sourceGridFabric, - long rewindTimeInSecondsOverride, - int replicationMetadataVersionId, - Optional emergencySourceRegion, - boolean versionSwapDeferred, - int repushSourceVersion) { - return addVersion( - clusterName, - storeName, - pushJobId, - versionNumber, - numberOfPartitions, - replicationFactor, - startIngestion, - sendStartOfPush, - sorted, - useFastKafkaOperationTimeout, - pushType, - compressionDictionary, - remoteKafkaBootstrapServers, - sourceGridFabric, - rewindTimeInSecondsOverride, - replicationMetadataVersionId, - emergencySourceRegion, - versionSwapDeferred, - null, - repushSourceVersion); - } - private Optional getVersionFromSourceCluster( ReadWriteStoreRepository repository, String destinationCluster, @@ -2804,6 +2790,10 @@ private Pair addVersion( version.setRepushSourceVersion(repushSourceVersion); } + if (versionSwapDeferred && StringUtils.isNotEmpty(targetedRegions)) { + version.setTargetSwapRegion(targetedRegions); + } + Properties veniceViewProperties = new Properties(); veniceViewProperties.put(PARTITION_COUNT, numberOfPartitions); veniceViewProperties.put(USE_FAST_KAFKA_OPERATION_TIMEOUT, useFastKafkaOperationTimeout); @@ -3061,6 +3051,7 @@ public Version incrementVersionIdempotent( replicationMetadataVersionId, emergencySourceRegion, versionSwapDeferred, + null, repushSourceVersion).getSecond(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java index 65422002e3b..3b692e67ad8 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java @@ -666,7 +666,9 @@ private void handleAddVersion(AddVersion message) { } else { boolean skipConsumption = message.targetedRegions != null && !message.targetedRegions.isEmpty() && message.targetedRegions.stream().map(Object::toString).noneMatch(regionName::equals); - if (skipConsumption) { + boolean isTargetRegionPushWithDeferredSwap = message.targetedRegions != null && message.versionSwapDeferred; + String targetedRegions = message.targetedRegions != null ? String.join(",", message.targetedRegions) : ""; + if (skipConsumption && isTargetRegionPushWithDeferredSwap) { // for targeted region push, only allow specified region to process add version message LOGGER.info( "Skip the add version message for store {} in region {} since this is targeted region push and " @@ -687,6 +689,7 @@ private void handleAddVersion(AddVersion message) { rewindTimeInSecondsOverride, replicationMetadataVersionId, message.versionSwapDeferred, + targetedRegions, repushSourceVersion); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index 9a5a78e61f0..56e5939fe39 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -31,6 +31,7 @@ import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.throttle.EventThrottler; import com.linkedin.venice.utils.HelixUtils; +import com.linkedin.venice.utils.RegionUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; @@ -88,6 +89,7 @@ public abstract class AbstractPushMonitor private final boolean isOfflinePushMonitorDaVinciPushStatusEnabled; private final DisabledPartitionStats disabledPartitionStats; + private final String regionName; public AbstractPushMonitor( String clusterName, @@ -134,6 +136,7 @@ public AbstractPushMonitor( controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio(), controllerConfig.useDaVinciSpecificExecutionStatusForError()); this.isOfflinePushMonitorDaVinciPushStatusEnabled = controllerConfig.isDaVinciPushStatusEnabled(); + this.regionName = controllerConfig.getRegionName(); pushStatusCollector.start(); } @@ -1120,15 +1123,37 @@ private void updateStoreVersionStatus(String storeName, int versionNumber, Versi storeName, versionNumber)); } - if (version.isVersionSwapDeferred()) { + + /** + * Switch to the new version if: + * 1.deferred version swap is not enabled and it is not a target region push w/ deferred version swap + * 2.target region push w/ deferred swap is enabled and the current region matches the target region + * + * Do not switch to the new version now if: + * 1.deferred version swap is enabled (it will be manually swapped at a later date) + * 2.target region push is enabled and the current region does NOT match the target region (it will be swapped + * after targetSwapRegionWaitTime passes) + */ + Set targetRegions = RegionUtils.parseRegionsFilterList(version.getTargetSwapRegion()); + boolean isValidTargetRegion = targetRegions.contains(regionName); + boolean isTargetRegionPushWithDeferredSwap = version.isVersionSwapDeferred() && isValidTargetRegion; + boolean isNormalPush = !version.isVersionSwapDeferred(); + boolean isDeferredSwap = version.isVersionSwapDeferred() && targetRegions.isEmpty(); + if (isTargetRegionPushWithDeferredSwap || isNormalPush) { + int previousVersion = store.getCurrentVersion(); + store.setCurrentVersion(versionNumber); + realTimeTopicSwitcher.transmitVersionSwapMessage(store, previousVersion, versionNumber); + } else if (isDeferredSwap) { LOGGER.info( "Version swap is deferred for store {} on version {}. Skipping version swap.", store.getName(), versionNumber); } else { - int previousVersion = store.getCurrentVersion(); - store.setCurrentVersion(versionNumber); - realTimeTopicSwitcher.transmitVersionSwapMessage(store, previousVersion, versionNumber); + LOGGER.info( + "Version swap is deferred for store {} on version {} in region {} due to target region push w/ deferred swap", + store.getName(), + versionNumber, + regionName); } } else { LOGGER.info( From 6ef9fac22fe9a84af9a002418bb24402cc9e9ce9 Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Thu, 5 Dec 2024 17:22:13 -0800 Subject: [PATCH 02/20] add setting for target region push w/ deferred swap --- .../main/java/com/linkedin/venice/hadoop/PushJobSetting.java | 1 + .../main/java/com/linkedin/venice/hadoop/VenicePushJob.java | 4 ++-- .../venice/controller/kafka/consumer/AdminExecutionTask.java | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java index 487a899ca7b..d8ff38da583 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java @@ -78,6 +78,7 @@ public class PushJobSetting implements Serializable { public boolean d2Routing; public String targetedRegions; public boolean isTargetedRegionPushEnabled; + public boolean isTargetRegionPushWithDeferredSwapEnabled; public boolean isSystemSchemaReaderEnabled; public String systemSchemaClusterD2ServiceName; public String systemSchemaClusterD2ZKHost; diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index aa6c7342f8e..b7c846227de 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -508,6 +508,7 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { if (props.getBoolean(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, false)) { pushJobSettingToReturn.deferVersionSwap = true; pushJobSettingToReturn.isTargetedRegionPushEnabled = true; + pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled = true; } return pushJobSettingToReturn; @@ -859,8 +860,7 @@ public void run() { sendPushJobDetailsToController(); // only kick off the validation and post-validation flow when everything has to be done in a single VPJ - if (!pushJobSetting.isTargetedRegionPushEnabled - || (pushJobSetting.deferVersionSwap && pushJobSetting.isTargetedRegionPushEnabled)) { + if (!pushJobSetting.isTargetedRegionPushEnabled || pushJobSetting.isTargetRegionPushWithDeferredSwapEnabled) { return; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java index 3b692e67ad8..10f7a3cadb7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java @@ -668,7 +668,7 @@ private void handleAddVersion(AddVersion message) { && message.targetedRegions.stream().map(Object::toString).noneMatch(regionName::equals); boolean isTargetRegionPushWithDeferredSwap = message.targetedRegions != null && message.versionSwapDeferred; String targetedRegions = message.targetedRegions != null ? String.join(",", message.targetedRegions) : ""; - if (skipConsumption && isTargetRegionPushWithDeferredSwap) { + if (skipConsumption && !isTargetRegionPushWithDeferredSwap) { // for targeted region push, only allow specified region to process add version message LOGGER.info( "Skip the add version message for store {} in region {} since this is targeted region push and " From 31e7f64d12d5888842465471a3375d19262a7757 Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Thu, 5 Dec 2024 20:01:57 -0800 Subject: [PATCH 03/20] fix tests --- .../venice/controller/VeniceHelixAdmin.java | 44 ++++++++++++++++++- .../consumer/AdminConsumptionTaskTest.java | 3 ++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 0bc54787256..13881f873a7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -2519,6 +2519,49 @@ private void createBatchTopics( useFastKafkaOperationTimeout)); } + private Pair addVersion( + String clusterName, + String storeName, + String pushJobId, + int versionNumber, + int numberOfPartitions, + int replicationFactor, + boolean startIngestion, + boolean sendStartOfPush, + boolean sorted, + boolean useFastKafkaOperationTimeout, + PushType pushType, + String compressionDictionary, + String remoteKafkaBootstrapServers, + Optional sourceGridFabric, + long rewindTimeInSecondsOverride, + int replicationMetadataVersionId, + Optional emergencySourceRegion, + boolean versionSwapDeferred, + int repushSourceVersion) { + return addVersion( + clusterName, + storeName, + pushJobId, + versionNumber, + numberOfPartitions, + replicationFactor, + startIngestion, + sendStartOfPush, + sorted, + useFastKafkaOperationTimeout, + pushType, + compressionDictionary, + remoteKafkaBootstrapServers, + sourceGridFabric, + rewindTimeInSecondsOverride, + replicationMetadataVersionId, + emergencySourceRegion, + versionSwapDeferred, + null, + repushSourceVersion); + } + private Optional getVersionFromSourceCluster( ReadWriteStoreRepository repository, String destinationCluster, @@ -3051,7 +3094,6 @@ public Version incrementVersionIdempotent( replicationMetadataVersionId, emergencySourceRegion, versionSwapDeferred, - null, repushSourceVersion).getSecond(); } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java index be38cb0311b..20ccb70b650 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java @@ -1162,6 +1162,7 @@ public void testResubscribeWithFailedAdminMessages() throws ExecutionException, -1, 1, false, + "", 0); // isLeaderController() is called once every consumption cycle (1000ms) and for every message processed in // AdminExecutionTask. @@ -1319,6 +1320,7 @@ public void testRetriableConsumptionException() -1, 1, false, + "", 0); Future future = veniceWriter.put( emptyKeyBytes, @@ -1450,6 +1452,7 @@ public void testAddVersionMsgHandlingForTargetedRegionPush() throws Exception { -1, 1, false, + "dc-0", 0); }); From f7e4735f39233b42582ba307c9fcd6ddc8cbde02 Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Fri, 6 Dec 2024 13:34:33 -0800 Subject: [PATCH 04/20] add test for admin consumption --- .../consumer/AdminConsumptionTaskTest.java | 89 +++++++++++++++++-- 1 file changed, 84 insertions(+), 5 deletions(-) diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java index 20ccb70b650..3c28e99c282 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java @@ -1427,17 +1427,41 @@ public void testAddVersionMsgHandlingForTargetedRegionPush() throws Exception { // dc-0 is the default region for the testing suite so the message for "dc-1" and "dc-2" should be ignored. veniceWriter.put( emptyKeyBytes, - getAddVersionMessage(clusterName, storeName, mockPushJobId, versionNumber, numberOfPartitions, 1L, "dc-1"), + getAddVersionMessage( + clusterName, + storeName, + mockPushJobId, + versionNumber, + numberOfPartitions, + 1L, + "dc-1", + false), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); veniceWriter.put( emptyKeyBytes, - getAddVersionMessage(clusterName, storeName, mockPushJobId, versionNumber, numberOfPartitions, 2L, "dc-2"), + getAddVersionMessage( + clusterName, + storeName, + mockPushJobId, + versionNumber, + numberOfPartitions, + 2L, + "dc-2", + false), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); veniceWriter.put( emptyKeyBytes, - getAddVersionMessage(clusterName, storeName, mockPushJobId, versionNumber, numberOfPartitions, 3L, "dc-0"), + getAddVersionMessage( + clusterName, + storeName, + mockPushJobId, + versionNumber, + numberOfPartitions, + 3L, + "dc-0", + false), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> { @@ -1461,6 +1485,51 @@ public void testAddVersionMsgHandlingForTargetedRegionPush() throws Exception { executor.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS); } + @Test + public void testAddVersionMsgHandlingForTargetedRegionPushWithDeferredSwap() throws Exception { + AdminConsumptionStats stats = mock(AdminConsumptionStats.class); + AdminConsumptionTask task = getAdminConsumptionTask(new RandomPollStrategy(), false, stats, 10000); + executor.submit(task); + String mockPushJobId = "testAddVersionMsgHandlingForTargetedRegionPushWithDeferredSwap"; + int versionNumber = 1; + int numberOfPartitions = 1; + + veniceWriter.put( + emptyKeyBytes, + getAddVersionMessage( + clusterName, + storeName, + mockPushJobId, + versionNumber, + numberOfPartitions, + 1L, + "dc-1", + true), + AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); + + // dc-0 is the default region, but with target region swap w/ deferred swap enabled, dc-1 should still ingest the + // data + TestUtils.waitForNonDeterministicAssertion(TIMEOUT, TimeUnit.MILLISECONDS, () -> { + verify(admin, times(1)).addVersionAndStartIngestion( + clusterName, + storeName, + mockPushJobId, + versionNumber, + numberOfPartitions, + Version.PushType.BATCH, + null, + -1, + 1, + true, + "dc-1", + 0); + }); + + task.close(); + executor.shutdown(); + executor.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS); + } + private byte[] getStoreCreationMessage( String clusterName, String storeName, @@ -1503,7 +1572,15 @@ private byte[] getAddVersionMessage( int versionNum, int numberOfPartitions, long executionId) { - return getAddVersionMessage(clusterName, storeName, pushJobId, versionNum, numberOfPartitions, executionId, null); + return getAddVersionMessage( + clusterName, + storeName, + pushJobId, + versionNum, + numberOfPartitions, + executionId, + null, + false); } private byte[] getAddVersionMessage( @@ -1513,7 +1590,8 @@ private byte[] getAddVersionMessage( int versionNum, int numberOfPartitions, long executionId, - String targetedRegions) { + String targetedRegions, + boolean deferredSwap) { AddVersion addVersion = (AddVersion) AdminMessageType.ADD_VERSION.getNewInstance(); addVersion.clusterName = clusterName; addVersion.storeName = storeName; @@ -1522,6 +1600,7 @@ private byte[] getAddVersionMessage( addVersion.numberOfPartitions = numberOfPartitions; addVersion.rewindTimeInSecondsOverride = -1; addVersion.timestampMetadataVersionId = 1; + addVersion.versionSwapDeferred = deferredSwap; if (targetedRegions != null) { addVersion.targetedRegions = new ArrayList<>(RegionUtils.parseRegionsFilterList(targetedRegions)); } From 74016227214cbe3684dac2e23fecc9d8c14bd151 Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Fri, 6 Dec 2024 15:17:01 -0800 Subject: [PATCH 05/20] set IsDavinciHeartbeatReported --- .../venice/pushmonitor/PushStatusCollector.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java index 7faa595a544..0cd5349c5b7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java @@ -171,6 +171,17 @@ private void scanDaVinciPushStatus() { } else { topicToNoDaVinciStatusRetryCountMap.remove(pushStatus.topicName); } + + // If there is some status for a davinci push, mark that there is a dvc heartbeat reported for the latest version + String storeName = Version.parseStoreFromKafkaTopicName(pushStatus.topicName); + Store store = storeRepository.getStore(storeName); + if (!store.getIsDavinciHeartbeatReported() && daVinciStatus.getStatus() != ExecutionStatus.NOT_CREATED) { + int versionNum = Version.parseVersionFromVersionTopicName(pushStatus.topicName); + Version version = store.getVersion(versionNum); + version.setIsDavinciHeartbeatReported(true); + storeRepository.updateStore(store); + } + LOGGER.info( "Received DaVinci status: {} with details: {} for topic: {}", daVinciStatus.getStatus(), From c130625cbf011978b787935aab38fe70d52e1650 Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Fri, 6 Dec 2024 16:49:55 -0800 Subject: [PATCH 06/20] fix exception from read only version --- .../java/com/linkedin/venice/meta/AbstractStore.java | 9 +++++++++ .../java/com/linkedin/venice/meta/ReadOnlyStore.java | 5 +++++ .../src/main/java/com/linkedin/venice/meta/Store.java | 2 ++ .../linkedin/venice/pushmonitor/PushStatusCollector.java | 3 +-- 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java index 9c4e56a5082..40ac5cb6fa1 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java @@ -348,4 +348,13 @@ public void fixMissingFields() { } } } + + @Override + public void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported) { + for (StoreVersion version: storeVersionsSupplier.getForUpdate()) { + if (version.getNumber() == versionNumber) { + version.setIsDaVinciHeartBeatReported(reported); + } + } + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java index 5271e879152..c531e7d2b8b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java @@ -1497,6 +1497,11 @@ public boolean getIsDavinciHeartbeatReported() { return delegate.getIsDavinciHeartbeatReported(); } + @Override + public void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported) { + throw new UnsupportedOperationException(); + } + @Override public String toString() { return this.delegate.toString(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java index 8b0c5b0710a..03a4b7f9261 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java @@ -341,4 +341,6 @@ static boolean isSystemStore(String storeName) { void setIsDavinciHeartbeatReported(boolean isReported); boolean getIsDavinciHeartbeatReported(); + + void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java index 0cd5349c5b7..2fee6e36c0b 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java @@ -177,8 +177,7 @@ private void scanDaVinciPushStatus() { Store store = storeRepository.getStore(storeName); if (!store.getIsDavinciHeartbeatReported() && daVinciStatus.getStatus() != ExecutionStatus.NOT_CREATED) { int versionNum = Version.parseVersionFromVersionTopicName(pushStatus.topicName); - Version version = store.getVersion(versionNum); - version.setIsDavinciHeartbeatReported(true); + store.updateVersionForDaVinciHeartbeat(versionNum, true); storeRepository.updateStore(store); } From e3f65d2d4e402ff0ced16d7d4288cb8678f66bd4 Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Mon, 9 Dec 2024 14:10:58 -0800 Subject: [PATCH 07/20] update config comment --- .../linkedin/venice/hadoop/VenicePushJob.java | 17 +++++++++-------- .../venice/vpj/VenicePushJobConstants.java | 3 ++- .../venice/hadoop/VenicePushJobTest.java | 3 +++ 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index b7c846227de..d724ca70d71 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -400,6 +400,15 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { if (pushJobSettingToReturn.isIncrementalPush && pushJobSettingToReturn.isTargetedRegionPushEnabled) { throw new VeniceException("Incremental push is not supported while using targeted region push mode"); } + + // If target region push with deferred version swap is enabled, enable deferVersionSwap and + // isTargetedRegionPushEnabled + if (props.getBoolean(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, false)) { + pushJobSettingToReturn.deferVersionSwap = true; + pushJobSettingToReturn.isTargetedRegionPushEnabled = true; + pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled = true; + } + if (props.containsKey(TARGETED_REGION_PUSH_LIST)) { if (pushJobSettingToReturn.isTargetedRegionPushEnabled) { pushJobSettingToReturn.targetedRegions = props.getString(TARGETED_REGION_PUSH_LIST); @@ -503,14 +512,6 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { pushJobSettingToReturn.dataWriterComputeJobClass = objectClass; } - // If target region push with deferred version swap is enabled, enable deferVersionSwap and - // isTargetedRegionPushEnabled - if (props.getBoolean(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, false)) { - pushJobSettingToReturn.deferVersionSwap = true; - pushJobSettingToReturn.isTargetedRegionPushEnabled = true; - pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled = true; - } - return pushJobSettingToReturn; } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java index 994b0ca6071..093b7842319 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java @@ -327,7 +327,8 @@ private VenicePushJobConstants() { /** * Config to enable target region push with deferred version swap * In this mode, the VPJ will push data to all regions and only switch to the new version in a single region. - * The single region is decided by the store config in {@link StoreInfo#getNativeReplicationSourceFabric()}}. + * The single region is decided by the store config in {@link StoreInfo#getNativeReplicationSourceFabric()}} unless + * a list of regions is passed in from targeted.region.push.list * After a specified wait time (default 1h), the remaining regions will switch to the new version. */ public static final String TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP = "targeted.region.push.with.deferred.swap"; diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index 190cb15a85f..e83b2d33ed7 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -927,14 +927,17 @@ public void testGetVeniceWriter() { @Test public void testTargetRegionPushWithDeferredSwapSettings() { Properties props = getVpjRequiredProperties(); + String regions = "test1, test2"; props.put(KEY_FIELD_PROP, "id"); props.put(VALUE_FIELD_PROP, "name"); props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true); + props.put(TARGETED_REGION_PUSH_LIST, regions); try (VenicePushJob pushJob = getSpyVenicePushJob(props, getClient())) { PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); Assert.assertEquals(pushJobSetting.deferVersionSwap, true); Assert.assertEquals(pushJobSetting.isTargetedRegionPushEnabled, true); + Assert.assertEquals(pushJobSetting.targetedRegions, regions); } } From f84707fdb525c1db20a0ad9b19b45b6ea67d6eea Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Tue, 10 Dec 2024 10:21:47 -0800 Subject: [PATCH 08/20] add log, combine booleans --- .../venice/pushmonitor/AbstractPushMonitor.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index 56e5939fe39..c960dd05664 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -1135,11 +1135,19 @@ private void updateStoreVersionStatus(String storeName, int versionNumber, Versi * after targetSwapRegionWaitTime passes) */ Set targetRegions = RegionUtils.parseRegionsFilterList(version.getTargetSwapRegion()); - boolean isValidTargetRegion = targetRegions.contains(regionName); - boolean isTargetRegionPushWithDeferredSwap = version.isVersionSwapDeferred() && isValidTargetRegion; + boolean isTargetRegionPushWithDeferredSwap = + version.isVersionSwapDeferred() && targetRegions.contains(regionName); boolean isNormalPush = !version.isVersionSwapDeferred(); boolean isDeferredSwap = version.isVersionSwapDeferred() && targetRegions.isEmpty(); if (isTargetRegionPushWithDeferredSwap || isNormalPush) { + LOGGER.debug( + "Swapping to version {} for store {} in region {}. isTargetRegionPushWithDeferredSwap is {} and" + + "isNormalPush is {}", + versionNumber, + store.getName(), + regionName, + isTargetRegionPushWithDeferredSwap, + isNormalPush); int previousVersion = store.getCurrentVersion(); store.setCurrentVersion(versionNumber); realTimeTopicSwitcher.transmitVersionSwapMessage(store, previousVersion, versionNumber); From 80adb8bc13c7158a5328b09f041839425dad6134 Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Tue, 10 Dec 2024 17:43:46 -0800 Subject: [PATCH 09/20] add test for isDavinciHeartbeatReported --- .../linkedin/venice/meta/AbstractStore.java | 8 ++++-- .../com/linkedin/venice/meta/VersionImpl.java | 1 + .../venice/endToEnd/DaVinciClientTest.java | 27 +++++++++++++++++++ .../pushmonitor/PushStatusCollector.java | 1 + 4 files changed, 35 insertions(+), 2 deletions(-) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java index 40ac5cb6fa1..b786655561e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/AbstractStore.java @@ -351,9 +351,13 @@ public void fixMissingFields() { @Override public void updateVersionForDaVinciHeartbeat(int versionNumber, boolean reported) { - for (StoreVersion version: storeVersionsSupplier.getForUpdate()) { + checkVersionSupplier(); + + for (StoreVersion storeVersion: storeVersionsSupplier.getForUpdate()) { + Version version = new VersionImpl(storeVersion); if (version.getNumber() == versionNumber) { - version.setIsDaVinciHeartBeatReported(reported); + version.setIsDavinciHeartbeatReported(reported); + return; } } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java index 37a4a3c638f..6406c3a4027 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java @@ -507,6 +507,7 @@ public Version cloneVersion() { clonedVersion.setBlobTransferEnabled(isBlobTransferEnabled()); clonedVersion.setTargetSwapRegion(getTargetSwapRegion()); clonedVersion.setTargetSwapRegionWaitTime(getTargetSwapRegionWaitTime()); + clonedVersion.setIsDavinciHeartbeatReported(getIsDavinciHeartbeatReported()); return clonedVersion; } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 5714633ec95..47c1b42717d 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -75,6 +75,7 @@ import com.linkedin.venice.integration.utils.VeniceRouterWrapper; import com.linkedin.venice.meta.IngestionMetadataUpdateType; import com.linkedin.venice.meta.IngestionMode; +import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.ConstantVenicePartitioner; @@ -1547,6 +1548,32 @@ public void testDVCSnapshotGeneration(boolean useDiskStorage) throws Exception { } } + @Test + public void testIsDavinciHeartbeatReported() throws Exception { + String storeName = Utils.getUniqueString("testIsDavinviHeartbeatReported"); + Consumer paramsConsumer = params -> params.setTargetRegionSwap("test"); + setUpStore(storeName, paramsConsumer, properties -> {}, true); + + try (DaVinciClient client = ServiceFactory.getGenericAvroDaVinciClient(storeName, cluster)) { + client.subscribeAll().get(); + } + + Properties vpjProperties = defaultVPJProps(cluster, inputDirPath, storeName); + runVPJ(vpjProperties, 2, cluster); + + try (ControllerClient controllerClient = cluster.getControllerClient()) { + StoreInfo store = controllerClient.getStore(storeName).getStore(); + TestUtils.waitForNonDeterministicAssertion( + 2, + TimeUnit.MILLISECONDS, + () -> Assert.assertEquals(store.getIsDavinciHeartbeatReported(), true)); + TestUtils.waitForNonDeterministicAssertion( + 2, + TimeUnit.MILLISECONDS, + () -> Assert.assertEquals(store.getVersion(2).get().getIsDavinciHeartbeatReported(), true)); + } + } + private void setupHybridStore(String storeName, Consumer paramsConsumer) throws Exception { setupHybridStore(storeName, paramsConsumer, KEY_COUNT); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java index 2fee6e36c0b..c1981529ac5 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java @@ -178,6 +178,7 @@ private void scanDaVinciPushStatus() { if (!store.getIsDavinciHeartbeatReported() && daVinciStatus.getStatus() != ExecutionStatus.NOT_CREATED) { int versionNum = Version.parseVersionFromVersionTopicName(pushStatus.topicName); store.updateVersionForDaVinciHeartbeat(versionNum, true); + store.setIsDavinciHeartbeatReported(true); storeRepository.updateStore(store); } From fe4943bf28bc8f0f7a3e153f2879fa27055b327c Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Tue, 10 Dec 2024 20:11:27 -0800 Subject: [PATCH 10/20] add input dir --- .../linkedin/venice/endToEnd/DaVinciClientTest.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 47c1b42717d..8f5ee113deb 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -1548,7 +1548,7 @@ public void testDVCSnapshotGeneration(boolean useDiskStorage) throws Exception { } } - @Test + @Test(timeOut = TEST_TIMEOUT) public void testIsDavinciHeartbeatReported() throws Exception { String storeName = Utils.getUniqueString("testIsDavinviHeartbeatReported"); Consumer paramsConsumer = params -> params.setTargetRegionSwap("test"); @@ -1558,7 +1558,14 @@ public void testIsDavinciHeartbeatReported() throws Exception { client.subscribeAll().get(); } - Properties vpjProperties = defaultVPJProps(cluster, inputDirPath, storeName); + File inputDirectory = getTempDataDirectory(); + String inputDirectoryPath = "file://" + inputDirectory.getAbsolutePath(); + try { + writeSimpleAvroFileWithIntToStringSchema(inputDirectory); + } catch (IOException e) { + throw new RuntimeException(e); + } + Properties vpjProperties = defaultVPJProps(cluster, inputDirectoryPath, storeName); runVPJ(vpjProperties, 2, cluster); try (ControllerClient controllerClient = cluster.getControllerClient()) { From 5a5393cd8554f9a9f2bbaa5f47fddc03d1eb5d0a Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Tue, 10 Dec 2024 21:25:16 -0800 Subject: [PATCH 11/20] add unit test for UpdateVersionForDaVinciHeartbeat --- .../test/java/com/linkedin/venice/meta/TestZKStore.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java b/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java index 7f58f92dd28..75d1dd03d16 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java @@ -436,4 +436,13 @@ public void testStoreLevelAcl() { 1); Assert.assertTrue(store.isAccessControlled()); } + + @Test + public void testUpdateVersionForDaVinciHeartbeat() { + String storeName = "testUpdateVersionForDaVinciHeartbeat"; + Store store = TestUtils.createTestStore(storeName, "owner", System.currentTimeMillis()); + store.addVersion(new VersionImpl(storeName, 1, "test")); + store.updateVersionForDaVinciHeartbeat(1, true); + Assert.assertTrue(store.getVersion(1).getIsDavinciHeartbeatReported()); + } } From f4bb1f3229ba4d5a7ec0146df5d3c8be269b0d74 Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Wed, 11 Dec 2024 12:32:09 -0800 Subject: [PATCH 12/20] update log statements and combine assert statements --- .../linkedin/venice/endToEnd/DaVinciClientTest.java | 12 ++++-------- .../venice/pushmonitor/AbstractPushMonitor.java | 7 ++++--- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 8f5ee113deb..ed88c956ea7 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -1570,14 +1570,10 @@ public void testIsDavinciHeartbeatReported() throws Exception { try (ControllerClient controllerClient = cluster.getControllerClient()) { StoreInfo store = controllerClient.getStore(storeName).getStore(); - TestUtils.waitForNonDeterministicAssertion( - 2, - TimeUnit.MILLISECONDS, - () -> Assert.assertEquals(store.getIsDavinciHeartbeatReported(), true)); - TestUtils.waitForNonDeterministicAssertion( - 2, - TimeUnit.MILLISECONDS, - () -> Assert.assertEquals(store.getVersion(2).get().getIsDavinciHeartbeatReported(), true)); + TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MILLISECONDS, () -> { + Assert.assertTrue(store.getIsDavinciHeartbeatReported()); + Assert.assertTrue(store.getVersion(2).get().getIsDavinciHeartbeatReported()); + }); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index c960dd05664..5c3dc332b52 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -1140,7 +1140,7 @@ private void updateStoreVersionStatus(String storeName, int versionNumber, Versi boolean isNormalPush = !version.isVersionSwapDeferred(); boolean isDeferredSwap = version.isVersionSwapDeferred() && targetRegions.isEmpty(); if (isTargetRegionPushWithDeferredSwap || isNormalPush) { - LOGGER.debug( + LOGGER.info( "Swapping to version {} for store {} in region {}. isTargetRegionPushWithDeferredSwap is {} and" + "isNormalPush is {}", versionNumber, @@ -1158,10 +1158,11 @@ private void updateStoreVersionStatus(String storeName, int versionNumber, Versi versionNumber); } else { LOGGER.info( - "Version swap is deferred for store {} on version {} in region {} due to target region push w/ deferred swap", + "Version swap is deferred for store {} on version {} in region {} because it is not in the target regions: {}", store.getName(), versionNumber, - regionName); + regionName, + targetRegions); } } else { LOGGER.info( From 551542cc0a7c1bba48b0916f379e567522264b8c Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Wed, 11 Dec 2024 12:34:38 -0800 Subject: [PATCH 13/20] update log --- .../com/linkedin/venice/pushmonitor/AbstractPushMonitor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index 5c3dc332b52..8b1ba119755 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -1141,8 +1141,8 @@ private void updateStoreVersionStatus(String storeName, int versionNumber, Versi boolean isDeferredSwap = version.isVersionSwapDeferred() && targetRegions.isEmpty(); if (isTargetRegionPushWithDeferredSwap || isNormalPush) { LOGGER.info( - "Swapping to version {} for store {} in region {}. isTargetRegionPushWithDeferredSwap is {} and" - + "isNormalPush is {}", + "Swapping to version {} for store {} in region {} during " + + (isNormalPush ? "normal push" : "target region push with deferred version swap"), versionNumber, store.getName(), regionName, From 2e083b909084cb032fd3838540456483bd00d0fb Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Wed, 11 Dec 2024 15:53:45 -0800 Subject: [PATCH 14/20] only use isTargetRegionPushWithDeferredSwapEnabled config --- .../com/linkedin/venice/hadoop/VenicePushJob.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index d724ca70d71..77921c0482f 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -405,12 +405,12 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { // isTargetedRegionPushEnabled if (props.getBoolean(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, false)) { pushJobSettingToReturn.deferVersionSwap = true; - pushJobSettingToReturn.isTargetedRegionPushEnabled = true; pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled = true; } if (props.containsKey(TARGETED_REGION_PUSH_LIST)) { - if (pushJobSettingToReturn.isTargetedRegionPushEnabled) { + if (pushJobSettingToReturn.isTargetedRegionPushEnabled + || pushJobSetting.isTargetRegionPushWithDeferredSwapEnabled) { pushJobSettingToReturn.targetedRegions = props.getString(TARGETED_REGION_PUSH_LIST); } else { throw new VeniceException("Targeted region push list is only supported when targeted region push is enabled"); @@ -854,7 +854,7 @@ public void run() { updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.JOB_STATUS_POLLING_COMPLETED); // Do not mark completed yet as for target region push it will be marked inside postValidationConsumption - if (!pushJobSetting.isTargetedRegionPushEnabled) { + if (!pushJobSetting.isTargetedRegionPushEnabled && !pushJobSetting.isTargetRegionPushWithDeferredSwapEnabled) { pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.COMPLETED.getValue())); } pushJobDetails.jobDurationInMs = LatencyUtils.getElapsedTimeFromMsToMs(pushJobSetting.jobStartTimeMs); @@ -2085,7 +2085,8 @@ private void validateStoreSettingAndPopulate(ControllerClient controllerClient, jobSetting.storeStorageQuota = storeResponse.getStore().getStorageQuotaInByte(); // Do not enable for deferred swap or hybrid store - boolean isDeferredSwap = pushJobSetting.deferVersionSwap && !pushJobSetting.isTargetedRegionPushEnabled; + boolean isDeferredSwap = + pushJobSetting.deferVersionSwap && !pushJobSetting.isTargetRegionPushWithDeferredSwapEnabled; if (isDeferredSwap || storeResponse.getStore().getHybridStoreConfig() != null) { LOGGER.warn( "target region is not available for {} as it hybrid or deferred version swap enabled.", @@ -2093,7 +2094,8 @@ private void validateStoreSettingAndPopulate(ControllerClient controllerClient, jobSetting.isTargetedRegionPushEnabled = false; } - if (jobSetting.isTargetedRegionPushEnabled && jobSetting.targetedRegions == null) { + if ((jobSetting.isTargetedRegionPushEnabled || jobSetting.isTargetRegionPushWithDeferredSwapEnabled) + && jobSetting.targetedRegions == null) { // only override the targeted regions if it is not set and it is a single region push // use source grid fabric as target region to reduce data hop, else use default NR source if (!StringUtils.isEmpty(jobSetting.sourceGridFabric)) { From eefa4e7b8068636212182449245277bdec9045a6 Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Wed, 11 Dec 2024 16:20:09 -0800 Subject: [PATCH 15/20] fix push job test --- .../src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java | 2 +- .../test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index 77921c0482f..f0c165f4d33 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -410,7 +410,7 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { if (props.containsKey(TARGETED_REGION_PUSH_LIST)) { if (pushJobSettingToReturn.isTargetedRegionPushEnabled - || pushJobSetting.isTargetRegionPushWithDeferredSwapEnabled) { + || pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled) { pushJobSettingToReturn.targetedRegions = props.getString(TARGETED_REGION_PUSH_LIST); } else { throw new VeniceException("Targeted region push list is only supported when targeted region push is enabled"); diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index e83b2d33ed7..07b7a169953 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -936,7 +936,7 @@ public void testTargetRegionPushWithDeferredSwapSettings() { try (VenicePushJob pushJob = getSpyVenicePushJob(props, getClient())) { PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); Assert.assertEquals(pushJobSetting.deferVersionSwap, true); - Assert.assertEquals(pushJobSetting.isTargetedRegionPushEnabled, true); + Assert.assertEquals(pushJobSetting.isTargetRegionPushWithDeferredSwapEnabled, true); Assert.assertEquals(pushJobSetting.targetedRegions, regions); } } From 5c8b9aec0d039454acf22a5872b55ab85e5f5c2d Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Wed, 11 Dec 2024 17:19:59 -0800 Subject: [PATCH 16/20] only allow one target region config to be enabled at once --- .../java/com/linkedin/venice/hadoop/VenicePushJob.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index f0c165f4d33..660f2c0b234 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -408,6 +408,13 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled = true; } + if (pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled + && pushJobSettingToReturn.isTargetedRegionPushEnabled) { + throw new VeniceException( + "Target region push and target region push with deferred version swap cannot be enabled" + + " at the same time"); + } + if (props.containsKey(TARGETED_REGION_PUSH_LIST)) { if (pushJobSettingToReturn.isTargetedRegionPushEnabled || pushJobSettingToReturn.isTargetRegionPushWithDeferredSwapEnabled) { From 1191f3c89ef2b549c22b5918682fb75027dcc95a Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Wed, 11 Dec 2024 22:31:11 -0800 Subject: [PATCH 17/20] add test for enabling both target region configs --- .../venice/hadoop/VenicePushJobTest.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index 07b7a169953..6881cf4295e 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -941,6 +941,26 @@ public void testTargetRegionPushWithDeferredSwapSettings() { } } + @Test + public void testEnableBothTargetRegionConfigs() { + Properties props = getVpjRequiredProperties(); + String regions = "test1, test2"; + props.put(KEY_FIELD_PROP, "id"); + props.put(VALUE_FIELD_PROP, "name"); + props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true); + props.put(TARGETED_REGION_PUSH_ENABLED, true); + props.put(TARGETED_REGION_PUSH_LIST, regions); + + try { + getSpyVenicePushJob(props, getClient()); + } catch (Exception e) { + assertEquals( + e.getMessage(), + "Target region push and target region push with deferred version swap cannot be enabled" + + " at the same time"); + } + } + private JobStatusQueryResponse mockJobStatusQuery() { JobStatusQueryResponse response = new JobStatusQueryResponse(); response.setStatus(ExecutionStatus.COMPLETED.toString()); From 693f2f6ea233d7389f51d840ef0dd093e6f7d283 Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Thu, 12 Dec 2024 10:36:23 -0800 Subject: [PATCH 18/20] remove try catch --- .../com/linkedin/venice/hadoop/VenicePushJobTest.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index 6881cf4295e..083606476ef 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -941,7 +941,7 @@ public void testTargetRegionPushWithDeferredSwapSettings() { } } - @Test + @Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = ".*cannot be enabled at the same time.*") public void testEnableBothTargetRegionConfigs() { Properties props = getVpjRequiredProperties(); String regions = "test1, test2"; @@ -951,14 +951,7 @@ public void testEnableBothTargetRegionConfigs() { props.put(TARGETED_REGION_PUSH_ENABLED, true); props.put(TARGETED_REGION_PUSH_LIST, regions); - try { - getSpyVenicePushJob(props, getClient()); - } catch (Exception e) { - assertEquals( - e.getMessage(), - "Target region push and target region push with deferred version swap cannot be enabled" - + " at the same time"); - } + getSpyVenicePushJob(props, getClient()); } private JobStatusQueryResponse mockJobStatusQuery() { From 7887d06331d36d51652ee47f73437e7a7669b5a0 Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Thu, 12 Dec 2024 11:47:26 -0800 Subject: [PATCH 19/20] update log statement --- .../venice/pushmonitor/AbstractPushMonitor.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index 8b1ba119755..2d408e703ef 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -1151,18 +1151,18 @@ private void updateStoreVersionStatus(String storeName, int versionNumber, Versi int previousVersion = store.getCurrentVersion(); store.setCurrentVersion(versionNumber); realTimeTopicSwitcher.transmitVersionSwapMessage(store, previousVersion, versionNumber); - } else if (isDeferredSwap) { - LOGGER.info( - "Version swap is deferred for store {} on version {}. Skipping version swap.", + } else { + String deferredSwapLog = String.format( + "Version swap is deferred for store %s on version %d. Skipping version swap", store.getName(), versionNumber); - } else { - LOGGER.info( - "Version swap is deferred for store {} on version {} in region {} because it is not in the target regions: {}", + String targetRegionSwapDeferredLog = String.format( + "Version swap is deferred for store %s on version %d in region %s because it is not in the target regions: %s", store.getName(), versionNumber, regionName, - targetRegions); + version.getTargetSwapRegion()); + LOGGER.info(isDeferredSwap ? deferredSwapLog : targetRegionSwapDeferredLog); } } else { LOGGER.info( From 2e12024afd4b3c6f0a8e8758a040d356a28f3ecf Mon Sep 17 00:00:00 2001 From: Michelle Kwong Date: Thu, 12 Dec 2024 11:57:23 -0800 Subject: [PATCH 20/20] remove %s/d usage --- .../venice/pushmonitor/AbstractPushMonitor.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index 2d408e703ef..30a6be4d424 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -1152,17 +1152,12 @@ private void updateStoreVersionStatus(String storeName, int versionNumber, Versi store.setCurrentVersion(versionNumber); realTimeTopicSwitcher.transmitVersionSwapMessage(store, previousVersion, versionNumber); } else { - String deferredSwapLog = String.format( - "Version swap is deferred for store %s on version %d. Skipping version swap", - store.getName(), - versionNumber); - String targetRegionSwapDeferredLog = String.format( - "Version swap is deferred for store %s on version %d in region %s because it is not in the target regions: %s", + LOGGER.info( + "Version swap is deferred for store {} on version {} in region {} because " + + (isDeferredSwap ? "deferred version swap is enabled" : "it is not in the target regions"), store.getName(), versionNumber, - regionName, - version.getTargetSwapRegion()); - LOGGER.info(isDeferredSwap ? deferredSwapLog : targetRegionSwapDeferredLog); + regionName); } } else { LOGGER.info(