From fa433de4b189715bbdd2b640fc7874f99c6034d2 Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Tue, 5 Dec 2023 18:36:22 -0800 Subject: [PATCH] [controller] Clear disabled partition metric on server restarts (#774) * [controller] Clear disabled partition metric on server restarts The metric for disabled partition only counts whenever a partition is disabled, but does not count down whenever its reenabled. This PR fixes that. --------- Co-authored-by: Sourav Maji --- .../controller/HelixVeniceClusterResources.java | 2 +- .../venice/controller/VeniceHelixAdmin.java | 13 +++++++++++++ .../controller/stats/DisabledPartitionStats.java | 9 ++++++--- .../venice/pushmonitor/AbstractPushMonitor.java | 9 ++++----- .../PartitionStatusBasedPushMonitor.java | 6 +++--- .../venice/pushmonitor/PushMonitorDelegator.java | 6 +++--- .../venice/controller/TestVeniceHelixAdmin.java | 2 ++ .../PartitionStatusBasedPushMonitorTest.java | 5 +++-- 8 files changed, 35 insertions(+), 17 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java index 9724523826..10ac24fa5d 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java @@ -167,7 +167,7 @@ public HelixVeniceClusterResources( helixAdminClient, config, admin.getPushStatusStoreReader(), - metricsRepository); + admin.getDisabledPartitionStats(clusterName)); this.leakedPushStatusCleanUpService = new LeakedPushStatusCleanUpService( clusterName, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index f2f18df3f0..506b7390d2 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -57,6 +57,7 @@ import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService; import com.linkedin.venice.controller.kafka.protocol.admin.HybridStoreConfigRecord; import com.linkedin.venice.controller.kafka.protocol.admin.StoreViewConfigRecord; +import com.linkedin.venice.controller.stats.DisabledPartitionStats; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.controllerapi.ControllerRoute; @@ -382,6 +383,8 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner { // Those variables will be initialized lazily. private int pushJobDetailsSchemaId = -1; + private final Map disabledPartitionStatMap = new HashMap<>(); + private static final String PUSH_JOB_DETAILS_WRITER = "PUSH_JOB_DETAILS_WRITER"; private final Map jobTrackingVeniceWriterMap = new VeniceConcurrentHashMap<>(); @@ -633,6 +636,7 @@ public VeniceHelixAdmin( } else { createControllerClusterIfRequired(); } + controllerStateModelFactory = new VeniceDistClusterControllerStateModelFactory( zkClient, adapterSerializer, @@ -649,6 +653,8 @@ public VeniceHelixAdmin( continue; } HelixLiveInstanceMonitor liveInstanceMonitor = new HelixLiveInstanceMonitor(this.zkClient, clusterName); + DisabledPartitionStats disabledPartitionStats = new DisabledPartitionStats(metricsRepository, clusterName); + disabledPartitionStatMap.put(clusterName, disabledPartitionStats); liveInstanceMonitorMap.put(clusterName, liveInstanceMonitor); // Register new instance callback liveInstanceMonitor.registerLiveInstanceChangedListener(new LiveInstanceChangedListener() { @@ -661,6 +667,7 @@ public void handleNewInstances(Set newInstances) { for (Map.Entry> entry: disabledPartitions.entrySet()) { helixAdminClient .enablePartition(true, clusterName, instance.getNodeId(), entry.getKey(), entry.getValue()); + disabledPartitionStats.recordClearDisabledPartition(); LOGGER.info("Enabled disabled replica of resource {}, partitions {}", entry.getKey(), entry.getValue()); } } @@ -4869,6 +4876,7 @@ public void deleteHelixResource(String clusterName, String kafkaTopic) { if (entry.getKey().equals(kafkaTopic)) { // clean up disabled partition map, so that it does not grow indefinitely with dropped resources getHelixAdminClient().enablePartition(true, clusterName, instance, kafkaTopic, entry.getValue()); + getDisabledPartitionStats(clusterName).recordClearDisabledPartition(); LOGGER.info("Cleaning up disabled replica of resource {}, partitions {}", entry.getKey(), entry.getValue()); } } @@ -5341,6 +5349,10 @@ public HelixAdminClient getHelixAdminClient() { return helixAdminClient; } + public DisabledPartitionStats getDisabledPartitionStats(String clusterName) { + return disabledPartitionStatMap.get(clusterName); + } + /** * @return a map containing the storage node name and its connectivity status (InstanceStatus). */ @@ -5361,6 +5373,7 @@ public Map getStorageNodesStatus(String clusterName, boolean ena Map> disabledPartitions = helixAdminClient.getDisabledPartitionsMap(clusterName, instance); for (Map.Entry> entry: disabledPartitions.entrySet()) { helixAdminClient.enablePartition(true, clusterName, instance, entry.getKey(), entry.getValue()); + getDisabledPartitionStats(clusterName).recordClearDisabledPartition(); LOGGER.info( "Enabled disabled replica of resource {}, partitions {} in cluster {}", entry.getKey(), diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DisabledPartitionStats.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DisabledPartitionStats.java index 9bf9ac0c33..dc9395e6f2 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DisabledPartitionStats.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/DisabledPartitionStats.java @@ -3,7 +3,7 @@ import com.linkedin.venice.stats.AbstractVeniceStats; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; -import io.tehuti.metrics.stats.Count; +import io.tehuti.metrics.stats.Total; public class DisabledPartitionStats extends AbstractVeniceStats { @@ -11,11 +11,14 @@ public class DisabledPartitionStats extends AbstractVeniceStats { public DisabledPartitionStats(MetricsRepository metricsRepository, String name) { super(metricsRepository, name); - disabledPartitionCount = registerSensorIfAbsent("disabled_partition_count", new Count()); + disabledPartitionCount = registerSensorIfAbsent("disabled_partition_count", new Total()); } public void recordDisabledPartition() { - disabledPartitionCount.record(); + disabledPartitionCount.record(1); } + public void recordClearDisabledPartition() { + disabledPartitionCount.record(-1); + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index cb52bc5ab9..d92e96fe77 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -34,7 +34,6 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.utils.locks.ClusterLockManager; -import io.tehuti.metrics.MetricsRepository; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -85,10 +84,10 @@ public abstract class AbstractPushMonitor private final long offlineJobResourceAssignmentWaitTimeInMilliseconds; private final PushStatusCollector pushStatusCollector; - private final DisabledPartitionStats disabledPartitionStats; - private final boolean isOfflinePushMonitorDaVinciPushStatusEnabled; + private final DisabledPartitionStats disabledPartitionStats; + public AbstractPushMonitor( String clusterName, OfflinePushAccessor offlinePushAccessor, @@ -103,7 +102,7 @@ public AbstractPushMonitor( HelixAdminClient helixAdminClient, VeniceControllerConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, - MetricsRepository metricsRepository) { + DisabledPartitionStats disabledPartitionStats) { this.clusterName = clusterName; this.offlinePushAccessor = offlinePushAccessor; this.storeCleaner = storeCleaner; @@ -115,7 +114,7 @@ public AbstractPushMonitor( this.aggregateRealTimeSourceKafkaUrl = aggregateRealTimeSourceKafkaUrl; this.activeActiveRealTimeSourceKafkaURLs = activeActiveRealTimeSourceKafkaURLs; this.helixAdminClient = helixAdminClient; - this.disabledPartitionStats = new DisabledPartitionStats(metricsRepository, clusterName); + this.disabledPartitionStats = disabledPartitionStats; this.disableErrorLeaderReplica = controllerConfig.isErrorLeaderReplicaFailOverEnabled(); this.helixClientThrottler = diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java index 1b5fc346a1..b9191897b8 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitor.java @@ -2,6 +2,7 @@ import com.linkedin.venice.controller.HelixAdminClient; import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.stats.DisabledPartitionStats; import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher; import com.linkedin.venice.meta.Instance; import com.linkedin.venice.meta.PartitionAssignment; @@ -10,7 +11,6 @@ import com.linkedin.venice.meta.StoreCleaner; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.utils.locks.ClusterLockManager; -import io.tehuti.metrics.MetricsRepository; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,7 +38,7 @@ public PartitionStatusBasedPushMonitor( HelixAdminClient helixAdminClient, VeniceControllerConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, - MetricsRepository metricsRepository) { + DisabledPartitionStats disabledPartitionStats) { super( clusterName, offlinePushAccessor, @@ -53,7 +53,7 @@ public PartitionStatusBasedPushMonitor( helixAdminClient, controllerConfig, pushStatusStoreReader, - metricsRepository); + disabledPartitionStats); } @Override diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java index 72e4b05fe6..07dbc12f27 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorDelegator.java @@ -2,6 +2,7 @@ import com.linkedin.venice.controller.HelixAdminClient; import com.linkedin.venice.controller.VeniceControllerConfig; +import com.linkedin.venice.controller.stats.DisabledPartitionStats; import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher; @@ -18,7 +19,6 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.utils.locks.ClusterLockManager; -import io.tehuti.metrics.MetricsRepository; import java.util.List; import java.util.Map; import java.util.Optional; @@ -57,7 +57,7 @@ public PushMonitorDelegator( HelixAdminClient helixAdminClient, VeniceControllerConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, - MetricsRepository metricsRepository) { + DisabledPartitionStats disabledPartitionStats) { this.clusterName = clusterName; this.metadataRepository = metadataRepository; @@ -75,7 +75,7 @@ public PushMonitorDelegator( helixAdminClient, controllerConfig, pushStatusStoreReader, - metricsRepository); + disabledPartitionStats); this.clusterLockManager = clusterLockManager; this.topicToPushMonitorMap = new VeniceConcurrentHashMap<>(); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java index 4541920c40..72bc0ea3fd 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java @@ -3,6 +3,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; +import com.linkedin.venice.controller.stats.DisabledPartitionStats; import com.linkedin.venice.helix.HelixExternalViewRepository; import com.linkedin.venice.meta.PartitionAssignment; import com.linkedin.venice.meta.ReadWriteStoreRepository; @@ -44,6 +45,7 @@ public void testDropResources() { doReturn(repository).when(veniceClusterResources).getRoutingDataRepository(); doReturn(nodes).when(veniceHelixAdmin).getStorageNodes(anyString()); doReturn(partitionAssignment).when(repository).getPartitionAssignments(anyString()); + doReturn(mock(DisabledPartitionStats.class)).when(veniceHelixAdmin).getDisabledPartitionStats(anyString()); doCallRealMethod().when(veniceHelixAdmin).deleteHelixResource(anyString(), anyString()); veniceHelixAdmin.deleteHelixResource(clusterName, kafkaTopic); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java index 4ac7907483..574256bd7c 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.when; import com.linkedin.venice.controller.HelixAdminClient; +import com.linkedin.venice.controller.stats.DisabledPartitionStats; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.CachedReadOnlyStoreRepository; import com.linkedin.venice.helix.HelixState; @@ -62,7 +63,7 @@ protected AbstractPushMonitor getPushMonitor(StoreCleaner storeCleaner) { helixAdminClient, getMockControllerConfig(), null, - mockMetricRepo); + mock(DisabledPartitionStats.class)); } @Override @@ -81,7 +82,7 @@ protected AbstractPushMonitor getPushMonitor(RealTimeTopicSwitcher mockRealTimeT mock(HelixAdminClient.class), getMockControllerConfig(), null, - mockMetricRepo); + mock(DisabledPartitionStats.class)); } @Test