Skip to content

Commit

Permalink
[server] Dropping unassigned partitions (#1196)
Browse files Browse the repository at this point in the history
* [server] Remove storage partitions not assigned to current host. Drop storage engine if all partitions have been dropped.

This involves:
1. Updating a function in StorageService to consult ideal state.
2. Hostname comparison for each partition.
3. Correct call in VeniceServer
4. Unit test
5. Integration test
  • Loading branch information
kristyelee authored Nov 19, 2024
1 parent 25f011d commit 72ce72e
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.PersistenceType;
Expand All @@ -42,6 +44,8 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.IdealState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.RocksDBException;
Expand Down Expand Up @@ -371,6 +375,47 @@ public synchronized AbstractStorageEngine openStore(
return engine;
}

public synchronized void checkWhetherStoragePartitionsShouldBeKeptOrNot(SafeHelixManager manager) {
if (manager == null) {
return;
}
for (AbstractStorageEngine storageEngine: getStorageEngineRepository().getAllLocalStorageEngines()) {
String storeName = storageEngine.getStoreVersionName();
Set<Integer> storageEnginePartitionIds = new HashSet<>(storageEngine.getPartitionIds());
String instanceHostName = manager.getInstanceName();
PropertyKey.Builder propertyKeyBuilder =
new PropertyKey.Builder(configLoader.getVeniceClusterConfig().getClusterName());
SafeHelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
IdealState idealState = helixDataAccessor.getProperty(propertyKeyBuilder.idealStates(storeName));

if (idealState != null) {
Map<String, Map<String, String>> mapFields = idealState.getRecord().getMapFields();
for (Integer partitionId: storageEnginePartitionIds) {
if (storageEngine.isMetadataPartition(partitionId)) {
continue;
}
String partitionDbName = storeName + "_" + partitionId;
if (!mapFields.containsKey(partitionDbName)
|| !mapFields.get(partitionDbName).containsKey(instanceHostName)) {
LOGGER.info(
"the following partition is not assigned to the current host {} and is being dropped from storage engine {}: {}",
instanceHostName,
storeName,
String.valueOf(partitionId));
storageEngine.dropPartition(partitionId);
}
}
if (storageEngine.getPartitionIds().isEmpty()) {
LOGGER.info("removing the storage engine {}, which has no partitions", storeName);
removeStorageEngine(storeName);
}
} else {
LOGGER.info("removing the storage engine {} as the ideal state is null", storeName);
removeStorageEngine(storeName);
}
}
}

/**
* Drops the partition of the specified store version in the storage service. When all data partitions are dropped,
* it will also drop the storage engine of the specific store version.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package com.linkedin.davinci.storage;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.linkedin.davinci.config.VeniceClusterConfig;
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
Expand All @@ -14,6 +19,8 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.StorageEngineFactory;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.PartitionerConfig;
Expand All @@ -23,13 +30,21 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.Utils;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.mockito.internal.util.collections.Sets;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -121,4 +136,74 @@ public void testGetStoreAndUserPartitionsMapping() {
expectedMapping.put(resourceName, partitionSet);
Assert.assertEquals(storageService.getStoreAndUserPartitionsMapping(), expectedMapping);
}

@Test
public void testCheckWhetherStoragePartitionsShouldBeKeptOrNot() throws NoSuchFieldException, IllegalAccessException {
StorageService mockStorageService = mock(StorageService.class);
SafeHelixManager manager = mock(SafeHelixManager.class);
StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class);
AbstractStorageEngine abstractStorageEngine = mock(AbstractStorageEngine.class);
mockStorageEngineRepository.addLocalStorageEngine(abstractStorageEngine);

String resourceName = "test_store_v1";
String storeName = "test_store";

when(abstractStorageEngine.getStoreVersionName()).thenReturn(resourceName);
abstractStorageEngine.addStoragePartition(0);
abstractStorageEngine.addStoragePartition(1);

String clusterName = "test_cluster";
VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class);
VeniceClusterConfig mockClusterConfig = mock(VeniceClusterConfig.class);
when(mockVeniceConfigLoader.getVeniceClusterConfig()).thenReturn(mockClusterConfig);
when(mockVeniceConfigLoader.getVeniceClusterConfig().getClusterName()).thenReturn(clusterName);

List<AbstractStorageEngine> localStorageEngines = new ArrayList<>();
localStorageEngines.add(abstractStorageEngine);

SafeHelixDataAccessor helixDataAccessor = mock(SafeHelixDataAccessor.class);
when(manager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
IdealState idealState = mock(IdealState.class);
when(helixDataAccessor.getProperty((PropertyKey) any())).thenReturn(idealState);
ZNRecord record = new ZNRecord("testId");
Map<String, Map<String, String>> mapFields = new HashMap<>();
Map<String, String> testPartitionZero = new HashMap<>();
Map<String, String> testPartitionOne = new HashMap<>();
testPartitionZero.put("host_1430", "LEADER");
testPartitionZero.put("host_1435", "STANDBY");
testPartitionZero.put("host_1440", "STANDBY");
testPartitionOne.put("host_1520", "LEADER");
testPartitionOne.put("host_1525", "STANDBY");
testPartitionOne.put("host_1530", "STANDBY");
mapFields.put("test_store_v1_0", testPartitionZero);
mapFields.put("test_store_v1_1", testPartitionOne);
record.setMapFields(mapFields);
when(idealState.getRecord()).thenReturn(record);
when(manager.getInstanceName()).thenReturn("host_1520");

Set<Integer> partitionSet = new HashSet<>(Arrays.asList(0, 1));
when(abstractStorageEngine.getPartitionIds()).thenReturn(partitionSet);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
int partitionId = invocation.getArgument(0);
abstractStorageEngine.getPartitionIds().remove(partitionId);
return null;
}
}).when(abstractStorageEngine).dropPartition(anyInt());

Field storageEngineRepositoryField = StorageService.class.getDeclaredField("storageEngineRepository");
storageEngineRepositoryField.setAccessible(true);
storageEngineRepositoryField.set(mockStorageService, mockStorageEngineRepository);
when(mockStorageService.getStorageEngineRepository()).thenReturn(mockStorageEngineRepository);
when(mockStorageService.getStorageEngineRepository().getAllLocalStorageEngines()).thenReturn(localStorageEngines);
Field configLoaderField = StorageService.class.getDeclaredField("configLoader");
configLoaderField.setAccessible(true);
configLoaderField.set(mockStorageService, mockVeniceConfigLoader);

doCallRealMethod().when(mockStorageService).checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
mockStorageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
verify(abstractStorageEngine).dropPartition(0);
Assert.assertFalse(abstractStorageEngine.getPartitionIds().contains(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestRequestBuilder;
import com.linkedin.r2.message.rest.RestResponse;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Encoder;
Expand Down Expand Up @@ -118,16 +120,10 @@ public void testCheckBeforeJoinCluster() {
.assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine.");

// Create a storage engine.
String storeName = Version.composeKafkaTopic(Utils.getUniqueString("testCheckBeforeJoinCluster"), 1);
server.getVeniceServer()
.getStorageService()
.openStoreForNewPartition(
server.getVeniceServer().getConfigLoader().getStoreConfig(storeName),
1,
() -> null);
Assert.assertEquals(
String storeName = cluster.createStore(10);
Assert.assertNotEquals(
repository.getAllLocalStorageEngines().size(),
1,
0,
"We have created one storage engine for store: " + storeName);

// Restart server, as server's info leave in Helix cluster, so we expect that all local storage would NOT be
Expand All @@ -136,7 +132,8 @@ public void testCheckBeforeJoinCluster() {
cluster.stopVeniceServer(server.getPort());
cluster.restartVeniceServer(server.getPort());
repository = server.getVeniceServer().getStorageService().getStorageEngineRepository();
Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1, "We should not cleanup the local storage");
Assert
.assertNotEquals(repository.getAllLocalStorageEngines().size(), 0, "We should not cleanup the local storage");

// Stop server, remove it from the cluster then restart. We expect that all local storage would be deleted. Once
// the server join again.
Expand Down Expand Up @@ -182,6 +179,43 @@ public void testCheckBeforeJointClusterBeforeHelixInitializingCluster() throws E
}
}

@Test
public void testStartServerAndShutdownWithPartitionAssignmentVerification() {
try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 0, 0)) {
Properties featureProperties = new Properties();
featureProperties.setProperty(SERVER_ENABLE_SERVER_ALLOW_LIST, Boolean.toString(true));
featureProperties.setProperty(SERVER_IS_AUTO_JOIN, Boolean.toString(true));
cluster.addVeniceServer(featureProperties, new Properties());
VeniceServerWrapper server = cluster.getVeniceServers().get(0);
Assert.assertTrue(server.getVeniceServer().isStarted());
StorageService storageService = server.getVeniceServer().getStorageService();
StorageEngineRepository repository = storageService.getStorageEngineRepository();
Assert
.assertTrue(repository.getAllLocalStorageEngines().isEmpty(), "New node should not have any storage engine.");

// Create a storage engine.
String storeName = Version.composeKafkaTopic(cluster.createStore(1), 1);
Assert.assertEquals(repository.getAllLocalStorageEngines().size(), 1);
Assert.assertTrue(server.getVeniceServer().getHelixParticipationService().isRunning());
Assert.assertEquals(storageService.getStorageEngine(storeName).getPartitionIds().size(), 3);

cluster.stopVeniceServer(server.getPort());

// Create new servers so partition assignment is removed for the offline participant
cluster.addVeniceServer(featureProperties, new Properties());
cluster.addVeniceServer(featureProperties, new Properties());

cluster.restartVeniceServer(server.getPort());
StorageEngineRepository storageEngineRepository =
server.getVeniceServer().getStorageService().getStorageEngineRepository();

TestUtils.waitForNonDeterministicAssertion(
3,
TimeUnit.SECONDS,
() -> Assert.assertEquals(storageEngineRepository.getAllLocalStorageEngines().size(), 0));
}
}

@Test
public void testMetadataFetchRequest() throws ExecutionException, InterruptedException, IOException {
Utils.thisIsLocalhost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6363,8 +6363,10 @@ private void createClusterIfRequired(String clusterName) {
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.DELAY_REBALANCE_TIME.name(), String.valueOf(delayedTime));
}
// Topology and fault zone type fields are used by CRUSH/CRUSHED/WAGED/etc alg. Helix would apply the constrains on
// these alg to choose proper instance to hold the replica.
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(), String.valueOf(true));
// Topology and fault zone type fields are used by CRUSH alg. Helix would apply the constrains on CRUSH alg to
// choose proper instance to hold the replica.
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), "/" + HelixUtils.TOPOLOGY_CONSTRAINT);
helixClusterProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,11 @@ private List<AbstractVeniceService> createServices() {
return helixData;
});

managerFuture.thenApply(manager -> {
storageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
return true;
});

heartbeatMonitoringService = new HeartbeatMonitoringService(
metricsRepository,
metadataRepo,
Expand Down

0 comments on commit 72ce72e

Please sign in to comment.