Skip to content

Commit

Permalink
[controller] Made disable replica service interval configurable (#811)
Browse files Browse the repository at this point in the history
* [controller] Made disable replica service interval configurable

Made disable replica service interval configurable so that it can be changed based on need. Also some logging changes in store cleanup service and skipping stores without any version.


---------

Co-authored-by: Sourav Maji <[email protected]>
  • Loading branch information
majisourav99 and Sourav Maji authored Jan 4, 2024
1 parent 16ed564 commit 6827575
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,13 @@ private ConfigKeys() {
"kme.registration.from.message.header.enabled";

/**
* The following config is to control whether to enable backup version cleanup based on retention policy or not at cluster level.
* The following config is to control whether to turn on disabled replica enabler service.
*/
public static final String CONTROLLER_ENABLE_DISABLED_REPLICA_ENABLED = "controller.enable.disabled.replica.enabled";

public static final String CONTROLLER_DISABLED_REPLICA_ENABLER_INTERVAL_MS =
"controller.disabled.replica.enabler.interval.ms";

// Server specific configs
public static final String LISTENER_PORT = "listener.port";
public static final String GRPC_READ_SERVER_PORT = "grpc.read.server.port";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Time;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -36,7 +35,7 @@ public DisabledPartitionEnablerService(
this.admin = admin;
this.multiClusterConfig = multiClusterConfig;
this.allClusters = multiClusterConfig.getClusters();
this.sleepInterval = TimeUnit.HOURS.toMillis(10);
this.sleepInterval = multiClusterConfig.getDisabledReplicaEnablerServiceIntervalMs();
this.time = time;
this.cleanupThread = new Thread(new DisabledPartitionEnablerTask(), "StoreBackupVersionCleanupTask");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.meta.Store.NON_EXISTING_VERSION;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse;
import com.linkedin.venice.ConfigKeys;
Expand Down Expand Up @@ -146,11 +148,11 @@ private boolean validateAllRouterOnCurrentVersion(Store store, String clusterNam
for (Instance routerInstance: liveRouterInstances) {
try {
HttpGet routerRequest =
new HttpGet(routerInstance.getUrl() + "/" + TYPE_CURRENT_VERSION + "/" + store.getName());
new HttpGet(routerInstance.getUrl(true) + "/" + TYPE_CURRENT_VERSION + "/" + store.getName());
HttpResponse response = getHttpAsyncClient().execute(routerRequest, null).get(500, TimeUnit.MILLISECONDS);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
LOGGER.warn(
"Got status code {} from host {} while querying current version for store {}",
"Got status code {} from host {} while querying router current version for store {}",
response.getStatusLine().getStatusCode(),
routerInstance,
store.getName());
Expand All @@ -166,7 +168,7 @@ private boolean validateAllRouterOnCurrentVersion(Store store, String clusterNam
return false;
}
} catch (Exception e) {
LOGGER.error("Got exception while getting current version for store {}", store.getName(), e);
LOGGER.error("Got exception while getting router current version for store {}", store.getName(), e);
return false;
}
}
Expand All @@ -179,11 +181,11 @@ private boolean validateAllServerOnCurrentVersion(Store store, String clusterNam
for (Instance instance: instances) {
try {
HttpGet routerRequest = new HttpGet(
instance.getUrl() + "/" + QueryAction.CURRENT_VERSION.toString().toLowerCase() + "/" + store.getName());
instance.getUrl(true) + "/" + QueryAction.CURRENT_VERSION.toString().toLowerCase() + "/" + store.getName());
HttpResponse response = getHttpAsyncClient().execute(routerRequest, null).get(500, TimeUnit.MILLISECONDS);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
LOGGER.warn(
"Got status code {} from host {} while querying current version for store {}",
"Got status code {} from host {} while querying server current version for store {}",
response.getStatusLine().getStatusCode(),
instance,
store.getName());
Expand Down Expand Up @@ -220,6 +222,10 @@ protected boolean cleanupBackupVersion(Store store, String clusterName) {
List<Version> readyToBeRemovedVersions = new ArrayList<>();
int currentVersion = store.getCurrentVersion();

if (currentVersion == NON_EXISTING_VERSION) {
return false;
}

// Do not delete version unless all routers and all servers are on same current version
if (multiClusterConfig.getControllerConfig(clusterName).isBackupVersionMetadataFetchBasedCleanupEnabled()
&& (!validateAllRouterOnCurrentVersion(store, clusterName, currentVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static com.linkedin.venice.ConfigKeys.CONTROLLER_CLUSTER_LEADER_HAAS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_CLUSTER_REPLICA;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_CLUSTER_ZK_ADDRESSS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLED_REPLICA_ENABLER_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLED_ROUTES;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLE_PARENT_TOPIC_TRUNCATION_UPON_COMPLETION;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_EARLY_DELETE_BACKUP_ENABLED;
Expand Down Expand Up @@ -170,6 +171,9 @@ public class VeniceControllerConfig extends VeniceControllerClusterConfig {
private final long deprecatedJobTopicRetentionMs;
private final long deprecatedJobTopicMaxRetentionMs;
private final long topicCleanupSleepIntervalBetweenTopicListFetchMs;

private final long disabledReplicaEnablerServiceIntervalMs;

private final int topicCleanupDelayFactor;
private final int topicManagerKafkaOperationTimeOutMs;
private final int minNumberOfUnusedKafkaTopicsToPreserve;
Expand Down Expand Up @@ -412,6 +416,8 @@ public VeniceControllerConfig(VeniceProperties props) {
// topicCleanupSleepIntervalBetweenTopicListFetchMs
// = delayBeforeTopicDeletion

this.disabledReplicaEnablerServiceIntervalMs =
props.getLong(CONTROLLER_DISABLED_REPLICA_ENABLER_INTERVAL_MS, TimeUnit.HOURS.toMillis(16));
this.topicManagerKafkaOperationTimeOutMs =
props.getInt(TOPIC_MANAGER_KAFKA_OPERATION_TIMEOUT_MS, 30 * Time.MS_PER_SECOND);

Expand Down Expand Up @@ -629,6 +635,10 @@ public long getTopicCleanupSleepIntervalBetweenTopicListFetchMs() {
return topicCleanupSleepIntervalBetweenTopicListFetchMs;
}

public long getDisabledReplicaEnablerServiceIntervalMs() {
return disabledReplicaEnablerServiceIntervalMs;
}

public int getDaVinciPushStatusScanMaxOfflineInstance() {
return daVinciPushStatusScanMaxOfflineInstance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ public long getTopicCleanupSleepIntervalBetweenTopicListFetchMs() {
return getCommonConfig().getTopicCleanupSleepIntervalBetweenTopicListFetchMs();
}

public long getDisabledReplicaEnablerServiceIntervalMs() {
return getCommonConfig().getDisabledReplicaEnablerServiceIntervalMs();
}

public int getTopicCleanupDelayFactor() {
return getCommonConfig().getTopicCleanupDelayFactor();
}
Expand Down

0 comments on commit 6827575

Please sign in to comment.