Skip to content

Commit

Permalink
[admin-tool] Verify the migration status of system stores during migr…
Browse files Browse the repository at this point in the history
…ation (#786)

* [admin-tool] Verify the migration status of system stores during migration
---------

Co-authored-by: Koorous Vargha <[email protected]>
Co-authored-by: Koorous Vargha <[email protected]>
  • Loading branch information
3 people authored Dec 11, 2023
1 parent 8fc7a54 commit 9a3d18c
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ public static void main(String args[]) throws Exception {
}
}

static CommandLine getCommandLine(String[] args) throws ParseException, IOException {
public static CommandLine getCommandLine(String[] args) throws ParseException, IOException {
/**
* Command Options are split up for help text formatting, see printUsageAndExit()
*
Expand Down Expand Up @@ -1614,57 +1614,106 @@ private static void migrateStore(CommandLine cmd) {
+ "To complete migration fabric by fabric, use admin-tool command --complete-migration.");
}

private static void printMigrationStatus(ControllerClient controller, String storeName) {
@FunctionalInterface
public interface PrintFunction {
void apply(String message);
}

private static void printMigrationStatus(ControllerClient controller, String storeName, PrintFunction printFunction) {
StoreInfo store = controller.getStore(storeName).getStore();

System.err.println("\n" + controller.getClusterName() + "\t" + controller.getLeaderControllerUrl());
printFunction.apply("\n" + controller.getClusterName() + "\t" + controller.getLeaderControllerUrl());

if (store == null) {
System.err.println(storeName + " DOES NOT EXIST in this cluster " + controller.getClusterName());
printFunction.apply(storeName + " DOES NOT EXIST in this cluster " + controller.getClusterName());
} else {
System.err.println(storeName + " exists in this cluster " + controller.getClusterName());
System.err.println("\t" + storeName + ".isMigrating = " + store.isMigrating());
System.err.println("\t" + storeName + ".largestUsedVersion = " + store.getLargestUsedVersionNumber());
System.err.println("\t" + storeName + ".currentVersion = " + store.getCurrentVersion());
System.err.println("\t" + storeName + ".versions = ");
store.getVersions().stream().forEach(version -> System.err.println("\t\t" + version.toString()));
printFunction.apply(storeName + " exists in this cluster " + controller.getClusterName());
printFunction.apply("\t" + storeName + ".isMigrating = " + store.isMigrating());
printFunction.apply("\t" + storeName + ".largestUsedVersion = " + store.getLargestUsedVersionNumber());
printFunction.apply("\t" + storeName + ".currentVersion = " + store.getCurrentVersion());
printFunction.apply("\t" + storeName + ".versions = ");
store.getVersions().stream().forEach(version -> printFunction.apply("\t\t" + version.toString()));
}

System.err.println(
printFunction.apply(
"\t" + storeName + " belongs to cluster " + controller.discoverCluster(storeName).getCluster()
+ " according to cluster discovery");
}

private static void printSystemStoreMigrationStatus(
ControllerClient controller,
String storeName,
PrintFunction printFunction) {
MultiStoreResponse clusterStores = controller.queryStoreList();

for (String currStoreName: clusterStores.getStores()) {
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(currStoreName);

if (systemStoreType != null && systemStoreType.extractRegularStoreName(currStoreName).equals(storeName)) {
printMigrationStatus(controller, currStoreName, printFunction);
}
}
}

private static void checkMigrationStatus(CommandLine cmd) {
String veniceUrl = getRequiredArgument(cmd, Arg.URL);
checkMigrationStatus(cmd, System.err::println);
}

public static void checkMigrationStatus(CommandLine cmd, PrintFunction printFunction) {
checkMigrationStatus(
cmd,
printFunction,
new ControllerClient(getRequiredArgument(cmd, Arg.CLUSTER_SRC), getRequiredArgument(cmd, Arg.URL), sslFactory),
new ControllerClient(
getRequiredArgument(cmd, Arg.CLUSTER_DEST),
getRequiredArgument(cmd, Arg.URL),
sslFactory));
}

public static void checkMigrationStatus(
CommandLine cmd,
PrintFunction printFunction,
ControllerClient srcControllerClient,
ControllerClient destControllerClient) {
String storeName = getRequiredArgument(cmd, Arg.STORE);
String srcClusterName = getRequiredArgument(cmd, Arg.CLUSTER_SRC);
String destClusterName = getRequiredArgument(cmd, Arg.CLUSTER_DEST);
if (srcClusterName.equals(destClusterName)) {
throw new VeniceException("Source cluster and destination cluster cannot be the same!");
}

ControllerClient srcControllerClient = new ControllerClient(srcClusterName, veniceUrl, sslFactory);
ControllerClient destControllerClient = new ControllerClient(destClusterName, veniceUrl, sslFactory);

ChildAwareResponse response = srcControllerClient.listChildControllers(srcClusterName);

if (response.getChildDataCenterControllerUrlMap() == null && response.getChildDataCenterControllerD2Map() == null) {
// This is a controller in single datacenter setup
printMigrationStatus(srcControllerClient, storeName);
printMigrationStatus(destControllerClient, storeName);
printMigrationStatus(srcControllerClient, storeName, printFunction);
printMigrationStatus(destControllerClient, storeName, printFunction);

printSystemStoreMigrationStatus(srcControllerClient, storeName, printFunction);
printSystemStoreMigrationStatus(destControllerClient, storeName, printFunction);
} else {
// This is a parent controller
System.err.println("\n=================== Parent Controllers ====================");
printMigrationStatus(srcControllerClient, storeName);
printMigrationStatus(destControllerClient, storeName);
printMigrationStatus(srcControllerClient, storeName, printFunction);
printMigrationStatus(destControllerClient, storeName, printFunction);

printSystemStoreMigrationStatus(srcControllerClient, storeName, printFunction);
printSystemStoreMigrationStatus(destControllerClient, storeName, printFunction);

Map<String, ControllerClient> srcChildControllerClientMap = getControllerClientMap(srcClusterName, response);
Map<String, ControllerClient> destChildControllerClientMap = getControllerClientMap(destClusterName, response);

for (Map.Entry<String, ControllerClient> entry: srcChildControllerClientMap.entrySet()) {
System.err.println("\n\n=================== Child Datacenter " + entry.getKey() + " ====================");
printMigrationStatus(entry.getValue(), storeName);
printMigrationStatus(destChildControllerClientMap.get(entry.getKey()), storeName);

ControllerClient srcChildController = entry.getValue();
ControllerClient destChildController = destChildControllerClientMap.get(entry.getKey());

printMigrationStatus(srcChildController, storeName, printFunction);
printMigrationStatus(destChildController, storeName, printFunction);

printSystemStoreMigrationStatus(srcChildController, storeName, printFunction);
printSystemStoreMigrationStatus(destChildController, storeName, printFunction);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.linkedin.venice;

import com.linkedin.venice.controllerapi.ChildAwareResponse;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse;
import com.linkedin.venice.controllerapi.MultiStoreResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.meta.StoreInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.cli.ParseException;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;


public class TestCheckMigrationStatus {
String SRC_CLUSTER_NAME = "venice-source";
String DESTINATION_CLUSTER_NAME = "venice-destination";
String CONTROLLER_URL = "http://somePlaceElse.com:12345";
String STORE_NAME = "TestStore";
String SYSTEM_STORE_NAME = "venice_system_store_davinci_" + STORE_NAME;

@Test
public void testCheckMigrationStatus() throws ParseException, IOException {
String[] checkMigrationStatusArgs = { "--migration-status", "--url", CONTROLLER_URL, "--store", STORE_NAME,
"--cluster-src", SRC_CLUSTER_NAME, "--cluster-dest", DESTINATION_CLUSTER_NAME };

// Mock source controller client
ControllerClient mockSourceControllerClient = Mockito.mock(ControllerClient.class);
StoreInfo mockStoreInfo = new StoreInfo();
mockStoreInfo.setVersions(new ArrayList<>());
StoreResponse mockStoreResponse = Mockito.mock(StoreResponse.class);
MultiStoreResponse mockMultiStoreResponse = new MultiStoreResponse();
D2ServiceDiscoveryResponse mockD2ServiceDiscoveryResponse = Mockito.mock(D2ServiceDiscoveryResponse.class);
Mockito.when(mockD2ServiceDiscoveryResponse.getCluster()).thenReturn(SRC_CLUSTER_NAME);
mockMultiStoreResponse.setStores(new String[] { SYSTEM_STORE_NAME, STORE_NAME });
ChildAwareResponse mockChildAwareResponse = Mockito.mock(ChildAwareResponse.class);
Mockito.when(mockChildAwareResponse.getChildDataCenterControllerUrlMap()).thenReturn(new HashMap<>());
Mockito.when(mockChildAwareResponse.getChildDataCenterControllerD2Map()).thenReturn(new HashMap<>());
Mockito.when(mockSourceControllerClient.listChildControllers(SRC_CLUSTER_NAME)).thenReturn(mockChildAwareResponse);
Mockito.when(mockStoreResponse.getStore()).thenReturn(mockStoreInfo);
Mockito.when(mockSourceControllerClient.getStore(STORE_NAME)).thenReturn(mockStoreResponse);
Mockito.when(mockSourceControllerClient.getStore(SYSTEM_STORE_NAME)).thenReturn(mockStoreResponse);
Mockito.when(mockSourceControllerClient.getClusterName()).thenReturn(SRC_CLUSTER_NAME);
Mockito.when(mockSourceControllerClient.getLeaderControllerUrl()).thenReturn(CONTROLLER_URL);
Mockito.when(mockSourceControllerClient.discoverCluster(STORE_NAME)).thenReturn(mockD2ServiceDiscoveryResponse);
Mockito.when(mockSourceControllerClient.queryStoreList()).thenReturn(mockMultiStoreResponse);

ControllerClient mockDestinationControllerClient = Mockito.mock(ControllerClient.class);
mockMultiStoreResponse.setStores(new String[] { SYSTEM_STORE_NAME, STORE_NAME });
Mockito.when(mockStoreResponse.getStore()).thenReturn(mockStoreInfo);
Mockito.when(mockDestinationControllerClient.getStore(STORE_NAME)).thenReturn(mockStoreResponse);
Mockito.when(mockDestinationControllerClient.getStore(SYSTEM_STORE_NAME)).thenReturn(mockStoreResponse);
Mockito.when(mockDestinationControllerClient.getClusterName()).thenReturn(SRC_CLUSTER_NAME);
Mockito.when(mockDestinationControllerClient.getLeaderControllerUrl()).thenReturn(CONTROLLER_URL);
Mockito.when(mockDestinationControllerClient.discoverCluster(STORE_NAME))
.thenReturn(mockD2ServiceDiscoveryResponse);
Mockito.when(mockDestinationControllerClient.queryStoreList()).thenReturn(mockMultiStoreResponse);

// Store migration status output via closure PrintFunction
Set<String> statusOutput = new HashSet<String>();
AdminTool.PrintFunction printFunction = (message) -> {
statusOutput.add(message.trim());
};

AdminTool.checkMigrationStatus(
AdminTool.getCommandLine(checkMigrationStatusArgs),
printFunction,
mockSourceControllerClient,
mockDestinationControllerClient);

Assert.assertTrue(statusOutput.contains(String.format("%s\t%s", SRC_CLUSTER_NAME, CONTROLLER_URL)));
Assert
.assertTrue(statusOutput.contains(String.format("%s exists in this cluster %s", STORE_NAME, SRC_CLUSTER_NAME)));
Assert.assertTrue(statusOutput.contains(String.format("%s.isMigrating = false", STORE_NAME)));
Assert.assertTrue(statusOutput.contains(String.format("%s.largestUsedVersion = 0", STORE_NAME)));
Assert.assertTrue(statusOutput.contains(String.format("%s.currentVersion = 0", STORE_NAME)));
Assert.assertTrue(statusOutput.contains(String.format("%s.versions =", STORE_NAME)));
Assert.assertTrue(
statusOutput.contains(
String.format("%s belongs to cluster %s according to cluster discovery", STORE_NAME, SRC_CLUSTER_NAME)));

Assert.assertFalse(statusOutput.contains(String.format("%s\t%s", DESTINATION_CLUSTER_NAME, CONTROLLER_URL)));
Assert.assertFalse(
statusOutput.contains(String.format("%s exists in this cluster %s", STORE_NAME, DESTINATION_CLUSTER_NAME)));
Assert.assertFalse(
statusOutput.contains(
String.format(
"%s belongs to cluster %s according to cluster discovery",
STORE_NAME,
DESTINATION_CLUSTER_NAME)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.venice.AdminTool;
import com.linkedin.venice.AdminTool.PrintFunction;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.AbstractAvroStoreClient;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
Expand Down Expand Up @@ -56,8 +57,10 @@
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -292,8 +295,8 @@ public void testStoreMigrationWithDaVinciPushStatusSystemStore() throws Exceptio
createAndPushStore(srcClusterName, storeName);

// DaVinci push status system store is enabled by default. Check if it has online version.
String systemStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(storeName);
try (ControllerClient srcChildControllerClient = new ControllerClient(srcClusterName, childControllerUrl0)) {
String systemStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(storeName);
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
StoreResponse storeResponse = srcChildControllerClient.getStore(systemStoreName);
Assert.assertFalse(storeResponse.isError());
Expand Down Expand Up @@ -325,6 +328,30 @@ public void testStoreMigrationWithDaVinciPushStatusSystemStore() throws Exceptio
.assertEquals(pushStatusStoreReader.getPartitionStatus(storeName, 1, 0, Optional.empty()).size(), 1));

startMigration(parentControllerUrl, storeName);

// Store migration status output via closure PrintFunction
Set<String> statusOutput = new HashSet<String>();
PrintFunction printFunction = (message) -> {
statusOutput.add(message.trim());
System.err.println(message);
};

checkMigrationStatus(parentControllerUrl, storeName, printFunction);

// Check that store and system store exists in both source and destination cluster
Assert.assertTrue(
statusOutput.contains(
String.format(
"%s belongs to cluster venice-cluster0 according to cluster discovery",
storeName,
srcClusterName)));
Assert
.assertTrue(statusOutput.contains(String.format("%s exists in this cluster %s", storeName, destClusterName)));
Assert.assertTrue(
statusOutput.contains(String.format("%s exists in this cluster %s", systemStoreName, srcClusterName)));
Assert.assertTrue(
statusOutput.contains(String.format("%s exists in this cluster %s", systemStoreName, destClusterName)));

completeMigration(parentControllerUrl, storeName);

// Verify the da vinci push status system store is materialized in dest cluster and contains the same value
Expand All @@ -333,6 +360,20 @@ public void testStoreMigrationWithDaVinciPushStatusSystemStore() throws Exceptio
TimeUnit.SECONDS,
() -> Assert
.assertEquals(pushStatusStoreReader.getPartitionStatus(storeName, 1, 0, Optional.empty()).size(), 1));

// Verify that store and system store only exist in destination cluster after ending migration
statusOutput.clear();
endMigration(parentControllerUrl, storeName);
checkMigrationStatus(parentControllerUrl, storeName, printFunction);

Assert
.assertFalse(statusOutput.contains(String.format("%s exists in this cluster %s", storeName, srcClusterName)));
Assert
.assertTrue(statusOutput.contains(String.format("%s exists in this cluster %s", storeName, destClusterName)));
Assert.assertFalse(
statusOutput.contains(String.format("%s exists in this cluster %s", systemStoreName, srcClusterName)));
Assert.assertTrue(
statusOutput.contains(String.format("%s exists in this cluster %s", systemStoreName, destClusterName)));
} finally {
Utils.closeQuietlyWithErrorLogged(pushStatusStoreReader);
D2ClientUtils.shutdownClient(d2Client);
Expand Down Expand Up @@ -471,6 +512,13 @@ private void startMigration(String controllerUrl, String storeName) throws Excep
AdminTool.main(startMigrationArgs);
}

private void checkMigrationStatus(String controllerUrl, String storeName, PrintFunction printFunction)
throws Exception {
String[] checkMigrationStatusArgs = { "--migration-status", "--url", controllerUrl, "--store", storeName,
"--cluster-src", srcClusterName, "--cluster-dest", destClusterName };
AdminTool.checkMigrationStatus(AdminTool.getCommandLine(checkMigrationStatusArgs), printFunction);
}

private void completeMigration(String controllerUrl, String storeName) {
String[] completeMigration0 = { "--complete-migration", "--url", controllerUrl, "--store", storeName,
"--cluster-src", srcClusterName, "--cluster-dest", destClusterName, "--fabric", FABRIC0 };
Expand Down

0 comments on commit 9a3d18c

Please sign in to comment.