Skip to content

Commit

Permalink
[controller] Clear disabled partition metric on server restarts (#774)
Browse files Browse the repository at this point in the history
* [controller] Clear disabled partition metric on server restarts

The metric for disabled partition only counts whenever a partition is disabled, but does not count down whenever its reenabled. This PR fixes that.
---------

Co-authored-by: Sourav Maji <[email protected]>
  • Loading branch information
majisourav99 and Sourav Maji authored Dec 6, 2023
1 parent 8bc2190 commit fa433de
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public HelixVeniceClusterResources(
helixAdminClient,
config,
admin.getPushStatusStoreReader(),
metricsRepository);
admin.getDisabledPartitionStats(clusterName));

this.leakedPushStatusCleanUpService = new LeakedPushStatusCleanUpService(
clusterName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService;
import com.linkedin.venice.controller.kafka.protocol.admin.HybridStoreConfigRecord;
import com.linkedin.venice.controller.kafka.protocol.admin.StoreViewConfigRecord;
import com.linkedin.venice.controller.stats.DisabledPartitionStats;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.ControllerRoute;
Expand Down Expand Up @@ -382,6 +383,8 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner {
// Those variables will be initialized lazily.
private int pushJobDetailsSchemaId = -1;

private final Map<String, DisabledPartitionStats> disabledPartitionStatMap = new HashMap<>();

private static final String PUSH_JOB_DETAILS_WRITER = "PUSH_JOB_DETAILS_WRITER";
private final Map<String, VeniceWriter> jobTrackingVeniceWriterMap = new VeniceConcurrentHashMap<>();

Expand Down Expand Up @@ -633,6 +636,7 @@ public VeniceHelixAdmin(
} else {
createControllerClusterIfRequired();
}

controllerStateModelFactory = new VeniceDistClusterControllerStateModelFactory(
zkClient,
adapterSerializer,
Expand All @@ -649,6 +653,8 @@ public VeniceHelixAdmin(
continue;
}
HelixLiveInstanceMonitor liveInstanceMonitor = new HelixLiveInstanceMonitor(this.zkClient, clusterName);
DisabledPartitionStats disabledPartitionStats = new DisabledPartitionStats(metricsRepository, clusterName);
disabledPartitionStatMap.put(clusterName, disabledPartitionStats);
liveInstanceMonitorMap.put(clusterName, liveInstanceMonitor);
// Register new instance callback
liveInstanceMonitor.registerLiveInstanceChangedListener(new LiveInstanceChangedListener() {
Expand All @@ -661,6 +667,7 @@ public void handleNewInstances(Set<Instance> newInstances) {
for (Map.Entry<String, List<String>> entry: disabledPartitions.entrySet()) {
helixAdminClient
.enablePartition(true, clusterName, instance.getNodeId(), entry.getKey(), entry.getValue());
disabledPartitionStats.recordClearDisabledPartition();
LOGGER.info("Enabled disabled replica of resource {}, partitions {}", entry.getKey(), entry.getValue());
}
}
Expand Down Expand Up @@ -4869,6 +4876,7 @@ public void deleteHelixResource(String clusterName, String kafkaTopic) {
if (entry.getKey().equals(kafkaTopic)) {
// clean up disabled partition map, so that it does not grow indefinitely with dropped resources
getHelixAdminClient().enablePartition(true, clusterName, instance, kafkaTopic, entry.getValue());
getDisabledPartitionStats(clusterName).recordClearDisabledPartition();
LOGGER.info("Cleaning up disabled replica of resource {}, partitions {}", entry.getKey(), entry.getValue());
}
}
Expand Down Expand Up @@ -5341,6 +5349,10 @@ public HelixAdminClient getHelixAdminClient() {
return helixAdminClient;
}

public DisabledPartitionStats getDisabledPartitionStats(String clusterName) {
return disabledPartitionStatMap.get(clusterName);
}

/**
* @return a map containing the storage node name and its connectivity status (<code>InstanceStatus</code>).
*/
Expand All @@ -5361,6 +5373,7 @@ public Map<String, String> getStorageNodesStatus(String clusterName, boolean ena
Map<String, List<String>> disabledPartitions = helixAdminClient.getDisabledPartitionsMap(clusterName, instance);
for (Map.Entry<String, List<String>> entry: disabledPartitions.entrySet()) {
helixAdminClient.enablePartition(true, clusterName, instance, entry.getKey(), entry.getValue());
getDisabledPartitionStats(clusterName).recordClearDisabledPartition();
LOGGER.info(
"Enabled disabled replica of resource {}, partitions {} in cluster {}",
entry.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
import com.linkedin.venice.stats.AbstractVeniceStats;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import io.tehuti.metrics.stats.Count;
import io.tehuti.metrics.stats.Total;


public class DisabledPartitionStats extends AbstractVeniceStats {
private final Sensor disabledPartitionCount;

public DisabledPartitionStats(MetricsRepository metricsRepository, String name) {
super(metricsRepository, name);
disabledPartitionCount = registerSensorIfAbsent("disabled_partition_count", new Count());
disabledPartitionCount = registerSensorIfAbsent("disabled_partition_count", new Total());
}

public void recordDisabledPartition() {
disabledPartitionCount.record();
disabledPartitionCount.record(1);
}

public void recordClearDisabledPartition() {
disabledPartitionCount.record(-1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.ClusterLockManager;
import io.tehuti.metrics.MetricsRepository;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -85,10 +84,10 @@ public abstract class AbstractPushMonitor
private final long offlineJobResourceAssignmentWaitTimeInMilliseconds;

private final PushStatusCollector pushStatusCollector;
private final DisabledPartitionStats disabledPartitionStats;

private final boolean isOfflinePushMonitorDaVinciPushStatusEnabled;

private final DisabledPartitionStats disabledPartitionStats;

public AbstractPushMonitor(
String clusterName,
OfflinePushAccessor offlinePushAccessor,
Expand All @@ -103,7 +102,7 @@ public AbstractPushMonitor(
HelixAdminClient helixAdminClient,
VeniceControllerConfig controllerConfig,
PushStatusStoreReader pushStatusStoreReader,
MetricsRepository metricsRepository) {
DisabledPartitionStats disabledPartitionStats) {
this.clusterName = clusterName;
this.offlinePushAccessor = offlinePushAccessor;
this.storeCleaner = storeCleaner;
Expand All @@ -115,7 +114,7 @@ public AbstractPushMonitor(
this.aggregateRealTimeSourceKafkaUrl = aggregateRealTimeSourceKafkaUrl;
this.activeActiveRealTimeSourceKafkaURLs = activeActiveRealTimeSourceKafkaURLs;
this.helixAdminClient = helixAdminClient;
this.disabledPartitionStats = new DisabledPartitionStats(metricsRepository, clusterName);
this.disabledPartitionStats = disabledPartitionStats;

this.disableErrorLeaderReplica = controllerConfig.isErrorLeaderReplicaFailOverEnabled();
this.helixClientThrottler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.venice.controller.HelixAdminClient;
import com.linkedin.venice.controller.VeniceControllerConfig;
import com.linkedin.venice.controller.stats.DisabledPartitionStats;
import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.PartitionAssignment;
Expand All @@ -10,7 +11,6 @@
import com.linkedin.venice.meta.StoreCleaner;
import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import com.linkedin.venice.utils.locks.ClusterLockManager;
import io.tehuti.metrics.MetricsRepository;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -38,7 +38,7 @@ public PartitionStatusBasedPushMonitor(
HelixAdminClient helixAdminClient,
VeniceControllerConfig controllerConfig,
PushStatusStoreReader pushStatusStoreReader,
MetricsRepository metricsRepository) {
DisabledPartitionStats disabledPartitionStats) {
super(
clusterName,
offlinePushAccessor,
Expand All @@ -53,7 +53,7 @@ public PartitionStatusBasedPushMonitor(
helixAdminClient,
controllerConfig,
pushStatusStoreReader,
metricsRepository);
disabledPartitionStats);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.venice.controller.HelixAdminClient;
import com.linkedin.venice.controller.VeniceControllerConfig;
import com.linkedin.venice.controller.stats.DisabledPartitionStats;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher;
Expand All @@ -18,7 +19,6 @@
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.ClusterLockManager;
import io.tehuti.metrics.MetricsRepository;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -57,7 +57,7 @@ public PushMonitorDelegator(
HelixAdminClient helixAdminClient,
VeniceControllerConfig controllerConfig,
PushStatusStoreReader pushStatusStoreReader,
MetricsRepository metricsRepository) {
DisabledPartitionStats disabledPartitionStats) {
this.clusterName = clusterName;
this.metadataRepository = metadataRepository;

Expand All @@ -75,7 +75,7 @@ public PushMonitorDelegator(
helixAdminClient,
controllerConfig,
pushStatusStoreReader,
metricsRepository);
disabledPartitionStats);
this.clusterLockManager = clusterLockManager;

this.topicToPushMonitorMap = new VeniceConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;

import com.linkedin.venice.controller.stats.DisabledPartitionStats;
import com.linkedin.venice.helix.HelixExternalViewRepository;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
Expand Down Expand Up @@ -44,6 +45,7 @@ public void testDropResources() {
doReturn(repository).when(veniceClusterResources).getRoutingDataRepository();
doReturn(nodes).when(veniceHelixAdmin).getStorageNodes(anyString());
doReturn(partitionAssignment).when(repository).getPartitionAssignments(anyString());
doReturn(mock(DisabledPartitionStats.class)).when(veniceHelixAdmin).getDisabledPartitionStats(anyString());
doCallRealMethod().when(veniceHelixAdmin).deleteHelixResource(anyString(), anyString());

veniceHelixAdmin.deleteHelixResource(clusterName, kafkaTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.mockito.Mockito.when;

import com.linkedin.venice.controller.HelixAdminClient;
import com.linkedin.venice.controller.stats.DisabledPartitionStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.CachedReadOnlyStoreRepository;
import com.linkedin.venice.helix.HelixState;
Expand Down Expand Up @@ -62,7 +63,7 @@ protected AbstractPushMonitor getPushMonitor(StoreCleaner storeCleaner) {
helixAdminClient,
getMockControllerConfig(),
null,
mockMetricRepo);
mock(DisabledPartitionStats.class));
}

@Override
Expand All @@ -81,7 +82,7 @@ protected AbstractPushMonitor getPushMonitor(RealTimeTopicSwitcher mockRealTimeT
mock(HelixAdminClient.class),
getMockControllerConfig(),
null,
mockMetricRepo);
mock(DisabledPartitionStats.class));
}

@Test
Expand Down

0 comments on commit fa433de

Please sign in to comment.