From 72ce72e82717c39f72846b0cf3616a45d58be235 Mon Sep 17 00:00:00 2001 From: Kristy Lee <18691669+kristyelee@users.noreply.github.com> Date: Tue, 19 Nov 2024 14:07:37 -0800 Subject: [PATCH] [server] Dropping unassigned partitions (#1196) * [server] Remove storage partitions not assigned to current host. Drop storage engine if all partitions have been dropped. This involves: 1. Updating a function in StorageService to consult ideal state. 2. Hostname comparison for each partition. 3. Correct call in VeniceServer 4. Unit test 5. Integration test --- .../davinci/storage/StorageService.java | 45 ++++++++++ .../davinci/storage/StorageServiceTest.java | 85 +++++++++++++++++++ .../venice/server/VeniceServerTest.java | 54 +++++++++--- .../venice/controller/VeniceHelixAdmin.java | 6 +- .../linkedin/venice/server/VeniceServer.java | 5 ++ 5 files changed, 183 insertions(+), 12 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java index f6a92a5addd..74a69e054d0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java @@ -17,6 +17,8 @@ import com.linkedin.venice.ConfigKeys; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceNoStoreException; +import com.linkedin.venice.helix.SafeHelixDataAccessor; +import com.linkedin.venice.helix.SafeHelixManager; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.meta.PersistenceType; @@ -42,6 +44,8 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.rocksdb.RocksDBException; @@ -371,6 +375,47 @@ public synchronized AbstractStorageEngine openStore( return engine; } + public synchronized void checkWhetherStoragePartitionsShouldBeKeptOrNot(SafeHelixManager manager) { + if (manager == null) { + return; + } + for (AbstractStorageEngine storageEngine: getStorageEngineRepository().getAllLocalStorageEngines()) { + String storeName = storageEngine.getStoreVersionName(); + Set storageEnginePartitionIds = new HashSet<>(storageEngine.getPartitionIds()); + String instanceHostName = manager.getInstanceName(); + PropertyKey.Builder propertyKeyBuilder = + new PropertyKey.Builder(configLoader.getVeniceClusterConfig().getClusterName()); + SafeHelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor(); + IdealState idealState = helixDataAccessor.getProperty(propertyKeyBuilder.idealStates(storeName)); + + if (idealState != null) { + Map> mapFields = idealState.getRecord().getMapFields(); + for (Integer partitionId: storageEnginePartitionIds) { + if (storageEngine.isMetadataPartition(partitionId)) { + continue; + } + String partitionDbName = storeName + "_" + partitionId; + if (!mapFields.containsKey(partitionDbName) + || !mapFields.get(partitionDbName).containsKey(instanceHostName)) { + LOGGER.info( + "the following partition is not assigned to the current host {} and is being dropped from storage engine {}: {}", + instanceHostName, + storeName, + String.valueOf(partitionId)); + storageEngine.dropPartition(partitionId); + } + } + if (storageEngine.getPartitionIds().isEmpty()) { + LOGGER.info("removing the storage engine {}, which has no partitions", storeName); + removeStorageEngine(storeName); + } + } else { + LOGGER.info("removing the storage engine {} as the ideal state is null", storeName); + removeStorageEngine(storeName); + } + } + } + /** * Drops the partition of the specified store version in the storage service. When all data partitions are dropped, * it will also drop the storage engine of the specific store version. diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java index a86a4879bed..6f163afff5d 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java @@ -1,11 +1,16 @@ package com.linkedin.davinci.storage; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.linkedin.davinci.config.VeniceClusterConfig; import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.config.VeniceStoreVersionConfig; @@ -14,6 +19,8 @@ import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.StorageEngineFactory; import com.linkedin.venice.exceptions.VeniceNoStoreException; +import com.linkedin.venice.helix.SafeHelixDataAccessor; +import com.linkedin.venice.helix.SafeHelixManager; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.meta.PartitionerConfig; @@ -23,13 +30,21 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; import com.linkedin.venice.utils.Utils; +import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.mockito.internal.util.collections.Sets; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.Test; @@ -121,4 +136,74 @@ public void testGetStoreAndUserPartitionsMapping() { expectedMapping.put(resourceName, partitionSet); Assert.assertEquals(storageService.getStoreAndUserPartitionsMapping(), expectedMapping); } + + @Test + public void testCheckWhetherStoragePartitionsShouldBeKeptOrNot() throws NoSuchFieldException, IllegalAccessException { + StorageService mockStorageService = mock(StorageService.class); + SafeHelixManager manager = mock(SafeHelixManager.class); + StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class); + AbstractStorageEngine abstractStorageEngine = mock(AbstractStorageEngine.class); + mockStorageEngineRepository.addLocalStorageEngine(abstractStorageEngine); + + String resourceName = "test_store_v1"; + String storeName = "test_store"; + + when(abstractStorageEngine.getStoreVersionName()).thenReturn(resourceName); + abstractStorageEngine.addStoragePartition(0); + abstractStorageEngine.addStoragePartition(1); + + String clusterName = "test_cluster"; + VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class); + VeniceClusterConfig mockClusterConfig = mock(VeniceClusterConfig.class); + when(mockVeniceConfigLoader.getVeniceClusterConfig()).thenReturn(mockClusterConfig); + when(mockVeniceConfigLoader.getVeniceClusterConfig().getClusterName()).thenReturn(clusterName); + + List localStorageEngines = new ArrayList<>(); + localStorageEngines.add(abstractStorageEngine); + + SafeHelixDataAccessor helixDataAccessor = mock(SafeHelixDataAccessor.class); + when(manager.getHelixDataAccessor()).thenReturn(helixDataAccessor); + IdealState idealState = mock(IdealState.class); + when(helixDataAccessor.getProperty((PropertyKey) any())).thenReturn(idealState); + ZNRecord record = new ZNRecord("testId"); + Map> mapFields = new HashMap<>(); + Map testPartitionZero = new HashMap<>(); + Map testPartitionOne = new HashMap<>(); + testPartitionZero.put("host_1430", "LEADER"); + testPartitionZero.put("host_1435", "STANDBY"); + testPartitionZero.put("host_1440", "STANDBY"); + testPartitionOne.put("host_1520", "LEADER"); + testPartitionOne.put("host_1525", "STANDBY"); + testPartitionOne.put("host_1530", "STANDBY"); + mapFields.put("test_store_v1_0", testPartitionZero); + mapFields.put("test_store_v1_1", testPartitionOne); + record.setMapFields(mapFields); + when(idealState.getRecord()).thenReturn(record); + when(manager.getInstanceName()).thenReturn("host_1520"); + + Set partitionSet = new HashSet<>(Arrays.asList(0, 1)); + when(abstractStorageEngine.getPartitionIds()).thenReturn(partitionSet); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + int partitionId = invocation.getArgument(0); + abstractStorageEngine.getPartitionIds().remove(partitionId); + return null; + } + }).when(abstractStorageEngine).dropPartition(anyInt()); + + Field storageEngineRepositoryField = StorageService.class.getDeclaredField("storageEngineRepository"); + storageEngineRepositoryField.setAccessible(true); + storageEngineRepositoryField.set(mockStorageService, mockStorageEngineRepository); + when(mockStorageService.getStorageEngineRepository()).thenReturn(mockStorageEngineRepository); + when(mockStorageService.getStorageEngineRepository().getAllLocalStorageEngines()).thenReturn(localStorageEngines); + Field configLoaderField = StorageService.class.getDeclaredField("configLoader"); + configLoaderField.setAccessible(true); + configLoaderField.set(mockStorageService, mockVeniceConfigLoader); + + doCallRealMethod().when(mockStorageService).checkWhetherStoragePartitionsShouldBeKeptOrNot(manager); + mockStorageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager); + verify(abstractStorageEngine).dropPartition(0); + Assert.assertFalse(abstractStorageEngine.getPartitionIds().contains(0)); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java index 2e978c50878..83576ac7ea1 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/server/VeniceServerTest.java @@ -9,6 +9,7 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse; import com.linkedin.davinci.storage.StorageEngineRepository; +import com.linkedin.davinci.storage.StorageService; import com.linkedin.r2.message.rest.RestRequest; import com.linkedin.r2.message.rest.RestRequestBuilder; import com.linkedin.r2.message.rest.RestResponse; @@ -49,6 +50,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.Encoder; @@ -118,16 +120,10 @@ public void testCheckBeforeJoinCluster() { .assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine."); // Create a storage engine. - String storeName = Version.composeKafkaTopic(Utils.getUniqueString("testCheckBeforeJoinCluster"), 1); - server.getVeniceServer() - .getStorageService() - .openStoreForNewPartition( - server.getVeniceServer().getConfigLoader().getStoreConfig(storeName), - 1, - () -> null); - Assert.assertEquals( + String storeName = cluster.createStore(10); + Assert.assertNotEquals( repository.getAllLocalStorageEngines().size(), - 1, + 0, "We have created one storage engine for store: " + storeName); // Restart server, as server's info leave in Helix cluster, so we expect that all local storage would NOT be @@ -136,7 +132,8 @@ public void testCheckBeforeJoinCluster() { cluster.stopVeniceServer(server.getPort()); cluster.restartVeniceServer(server.getPort()); repository = server.getVeniceServer().getStorageService().getStorageEngineRepository(); - Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1, "We should not cleanup the local storage"); + Assert + .assertNotEquals(repository.getAllLocalStorageEngines().size(), 0, "We should not cleanup the local storage"); // Stop server, remove it from the cluster then restart. We expect that all local storage would be deleted. Once // the server join again. @@ -182,6 +179,43 @@ public void testCheckBeforeJointClusterBeforeHelixInitializingCluster() throws E } } + @Test + public void testStartServerAndShutdownWithPartitionAssignmentVerification() { + try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 0, 0)) { + Properties featureProperties = new Properties(); + featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true)); + featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(true)); + cluster.addVeniceServer(featureProperties, new Properties()); + VeniceServerWrapper server = cluster.getVeniceServers().get(0); + Assert.assertTrue(server.getVeniceServer().isStarted()); + StorageService storageService = server.getVeniceServer().getStorageService(); + StorageEngineRepository repository = storageService.getStorageEngineRepository(); + Assert + .assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine."); + + // Create a storage engine. + String storeName = Version.composeKafkaTopic(cluster.createStore(1), 1); + Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1); + Assert.assertTrue(server.getVeniceServer().getHelixParticipationService().isRunning()); + Assert.assertEquals(storageService.getStorageEngine(storeName).getPartitionIds().size(), 3); + + cluster.stopVeniceServer(server.getPort()); + + // Create new servers so partition assignment is removed for the offline participant + cluster.addVeniceServer(featureProperties, new Properties()); + cluster.addVeniceServer(featureProperties, new Properties()); + + cluster.restartVeniceServer(server.getPort()); + StorageEngineRepository storageEngineRepository = + server.getVeniceServer().getStorageService().getStorageEngineRepository(); + + TestUtils.waitForNonDeterministicAssertion( + 3, + TimeUnit.SECONDS, + () -> Assert.assertEquals(storageEngineRepository.getAllLocalStorageEngines().size(), 0)); + } + } + @Test public void testMetadataFetchRequest() throws ExecutionException, InterruptedException, IOException { Utils.thisIsLocalhost(); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 24faf1dedf2..209dd2ce6d2 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -6363,8 +6363,10 @@ private void createClusterIfRequired(String clusterName) { helixClusterProperties .put(ClusterConfig.ClusterConfigProperty.DELAY_REBALANCE_TIME.name(), String.valueOf(delayedTime)); } - // Topology and fault zone type fields are used by CRUSH/CRUSHED/WAGED/etc alg. Helix would apply the constrains on - // these alg to choose proper instance to hold the replica. + helixClusterProperties + .put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(), String.valueOf(true)); + // Topology and fault zone type fields are used by CRUSH alg. Helix would apply the constrains on CRUSH alg to + // choose proper instance to hold the replica. helixClusterProperties .put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), "/" + HelixUtils.TOPOLOGY_CONSTRAINT); helixClusterProperties diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 2611f92decf..18f6f50507f 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -356,6 +356,11 @@ private List createServices() { return helixData; }); + managerFuture.thenApply(manager -> { + storageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager); + return true; + }); + heartbeatMonitoringService = new HeartbeatMonitoringService( metricsRepository, metadataRepo,