Skip to content

Commit

Permalink
[controller] Make PS3 incremental push status reads config cluster-sp…
Browse files Browse the repository at this point in the history
…ecific

- Enable incremental push status reads from PS3 based on cluster
  configuration.  
- Query the push status store only if it’s enabled for the specific user store;
  otherwise, fallback to Zookeeper for status reads.  
- Ensure removal of removeFromSupposedlyOngoingIncrementalPushVersions status from
  PS3 when PS3 is enabled, regardless of read configuration.
  • Loading branch information
sushantmane authored Oct 8, 2024
1 parent d65cb71 commit 6901820
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public class VeniceControllerClusterConfig {
private final List<String> helixCloudInfoSources;
private final String helixCloudInfoProcessorName;

private final boolean usePushStatusStoreForIncrementalPush;
private final boolean usePushStatusStoreForIncrementalPushStatusReads;

private final long metaStoreWriterCloseTimeoutInMS;

Expand Down Expand Up @@ -899,7 +899,8 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
props.getBoolean(CONTROLLER_AUTO_MATERIALIZE_META_SYSTEM_STORE, false);
this.isAutoMaterializeDaVinciPushStatusSystemStoreEnabled =
props.getBoolean(CONTROLLER_AUTO_MATERIALIZE_DAVINCI_PUSH_STATUS_SYSTEM_STORE, false);
this.usePushStatusStoreForIncrementalPush = props.getBoolean(USE_PUSH_STATUS_STORE_FOR_INCREMENTAL_PUSH, false);
this.usePushStatusStoreForIncrementalPushStatusReads =
props.getBoolean(USE_PUSH_STATUS_STORE_FOR_INCREMENTAL_PUSH, false);
this.metaStoreWriterCloseTimeoutInMS = props.getLong(META_STORE_WRITER_CLOSE_TIMEOUT_MS, 300000L);
this.metaStoreWriterCloseConcurrency = props.getInt(META_STORE_WRITER_CLOSE_CONCURRENCY, -1);
this.emergencySourceRegion = props.getString(EMERGENCY_SOURCE_REGION, "");
Expand Down Expand Up @@ -1593,7 +1594,7 @@ public String getHelixCloudInfoProcessorName() {
}

public boolean usePushStatusStoreForIncrementalPush() {
return usePushStatusStoreForIncrementalPush;
return usePushStatusStoreForIncrementalPushStatusReads;
}

public long getMetaStoreWriterCloseTimeoutInMS() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner {
private final MetaStoreReader metaStoreReader;
private final D2Client d2Client;
private final Map<String, HelixReadWriteLiveClusterConfigRepository> clusterToLiveClusterConfigRepo;
private final boolean usePushStatusStoreToReadServerIncrementalPushStatus;
private static final String ZK_INSTANCES_SUB_PATH = "INSTANCES";
private static final String ZK_CUSTOMIZEDSTATES_SUB_PATH = "CUSTOMIZEDSTATES/" + HelixPartitionState.OFFLINE_PUSH;

Expand Down Expand Up @@ -555,7 +554,6 @@ public VeniceHelixAdmin(
isControllerClusterHAAS = commonConfig.isControllerClusterLeaderHAAS();
coloLeaderClusterName = commonConfig.getClusterName();
pushJobStatusStoreClusterName = commonConfig.getPushJobStatusStoreClusterName();
usePushStatusStoreToReadServerIncrementalPushStatus = commonConfig.usePushStatusStoreForIncrementalPush();

zkSharedSystemStoreRepository = new SharedHelixReadOnlyZKSharedSystemStoreRepository(
zkClient,
Expand Down Expand Up @@ -6052,7 +6050,7 @@ public OfflinePushStatusInfo getOffLinePushStatus(
// if status is not SOIP remove incremental push version from the supposedlyOngoingIncrementalPushVersions
if (incrementalPushVersion.isPresent()
&& (status == ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED || status == ExecutionStatus.NOT_CREATED)
&& usePushStatusStoreToReadServerIncrementalPushStatus) {
&& store.isDaVinciPushStatusStoreEnabled()) {
getPushStatusStoreWriter().removeFromSupposedlyOngoingIncrementalPushVersions(
store.getName(),
versionNumber,
Expand All @@ -6065,17 +6063,27 @@ private ExecutionStatusWithDetails getIncrementalPushStatus(
String clusterName,
String kafkaTopic,
String incrementalPushVersion,
PushMonitor monitor) {
PushMonitor monitor,
Store store) {
HelixCustomizedViewOfflinePushRepository cvRepo =
getHelixVeniceClusterResources(clusterName).getCustomizedViewRepository();
if (!usePushStatusStoreToReadServerIncrementalPushStatus) {
return monitor.getIncrementalPushStatusAndDetails(kafkaTopic, incrementalPushVersion, cvRepo);
boolean usePushStatusStoreForIncrementalPush =
multiClusterConfigs.getControllerConfig(clusterName).usePushStatusStoreForIncrementalPush();
if (usePushStatusStoreForIncrementalPush) {
if (store.isDaVinciPushStatusStoreEnabled()) {
return monitor.getIncrementalPushStatusFromPushStatusStore(
kafkaTopic,
incrementalPushVersion,
cvRepo,
getPushStatusStoreReader());
} else {
LOGGER.warn(
"Push status system store is not enabled for store: {} in cluster: {}, using ZK for incremental push status reads",
store.getName(),
clusterName);
}
}
return monitor.getIncrementalPushStatusFromPushStatusStore(
kafkaTopic,
incrementalPushVersion,
cvRepo,
getPushStatusStoreReader());
return monitor.getIncrementalPushStatusAndDetails(kafkaTopic, incrementalPushVersion, cvRepo);
}

private OfflinePushStatusInfo getOfflinePushStatusInfo(
Expand All @@ -6102,7 +6110,8 @@ private OfflinePushStatusInfo getOfflinePushStatusInfo(
}

if (incrementalPushVersion.isPresent()) {
statusAndDetails = getIncrementalPushStatus(clusterName, kafkaTopic, incrementalPushVersion.get(), monitor);
statusAndDetails =
getIncrementalPushStatus(clusterName, kafkaTopic, incrementalPushVersion.get(), monitor, store);
} else {
statusAndDetails = monitor.getPushStatusAndDetails(kafkaTopic);
}
Expand Down

0 comments on commit 6901820

Please sign in to comment.