From 9a3d18c5187039519148fedcba3d6e363603212b Mon Sep 17 00:00:00 2001 From: Koorous Vargha <35252220+kvargha@users.noreply.github.com> Date: Mon, 11 Dec 2023 10:36:23 -0800 Subject: [PATCH] [admin-tool] Verify the migration status of system stores during migration (#786) * [admin-tool] Verify the migration status of system stores during migration --------- Co-authored-by: Koorous Vargha Co-authored-by: Koorous Vargha --- .../java/com/linkedin/venice/AdminTool.java | 91 +++++++++++++---- .../venice/TestCheckMigrationStatus.java | 97 +++++++++++++++++++ .../venice/endToEnd/TestStoreMigration.java | 50 +++++++++- 3 files changed, 216 insertions(+), 22 deletions(-) create mode 100644 clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestCheckMigrationStatus.java diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index 666ed31dd6..7143c7e3db 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -558,7 +558,7 @@ public static void main(String args[]) throws Exception { } } - static CommandLine getCommandLine(String[] args) throws ParseException, IOException { + public static CommandLine getCommandLine(String[] args) throws ParseException, IOException { /** * Command Options are split up for help text formatting, see printUsageAndExit() * @@ -1614,29 +1614,67 @@ private static void migrateStore(CommandLine cmd) { + "To complete migration fabric by fabric, use admin-tool command --complete-migration."); } - private static void printMigrationStatus(ControllerClient controller, String storeName) { + @FunctionalInterface + public interface PrintFunction { + void apply(String message); + } + + private static void printMigrationStatus(ControllerClient controller, String storeName, PrintFunction printFunction) { StoreInfo store = controller.getStore(storeName).getStore(); - System.err.println("\n" + controller.getClusterName() + "\t" + controller.getLeaderControllerUrl()); + printFunction.apply("\n" + controller.getClusterName() + "\t" + controller.getLeaderControllerUrl()); if (store == null) { - System.err.println(storeName + " DOES NOT EXIST in this cluster " + controller.getClusterName()); + printFunction.apply(storeName + " DOES NOT EXIST in this cluster " + controller.getClusterName()); } else { - System.err.println(storeName + " exists in this cluster " + controller.getClusterName()); - System.err.println("\t" + storeName + ".isMigrating = " + store.isMigrating()); - System.err.println("\t" + storeName + ".largestUsedVersion = " + store.getLargestUsedVersionNumber()); - System.err.println("\t" + storeName + ".currentVersion = " + store.getCurrentVersion()); - System.err.println("\t" + storeName + ".versions = "); - store.getVersions().stream().forEach(version -> System.err.println("\t\t" + version.toString())); + printFunction.apply(storeName + " exists in this cluster " + controller.getClusterName()); + printFunction.apply("\t" + storeName + ".isMigrating = " + store.isMigrating()); + printFunction.apply("\t" + storeName + ".largestUsedVersion = " + store.getLargestUsedVersionNumber()); + printFunction.apply("\t" + storeName + ".currentVersion = " + store.getCurrentVersion()); + printFunction.apply("\t" + storeName + ".versions = "); + store.getVersions().stream().forEach(version -> printFunction.apply("\t\t" + version.toString())); } - System.err.println( + printFunction.apply( "\t" + storeName + " belongs to cluster " + controller.discoverCluster(storeName).getCluster() + " according to cluster discovery"); } + private static void printSystemStoreMigrationStatus( + ControllerClient controller, + String storeName, + PrintFunction printFunction) { + MultiStoreResponse clusterStores = controller.queryStoreList(); + + for (String currStoreName: clusterStores.getStores()) { + VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(currStoreName); + + if (systemStoreType != null && systemStoreType.extractRegularStoreName(currStoreName).equals(storeName)) { + printMigrationStatus(controller, currStoreName, printFunction); + } + } + } + private static void checkMigrationStatus(CommandLine cmd) { - String veniceUrl = getRequiredArgument(cmd, Arg.URL); + checkMigrationStatus(cmd, System.err::println); + } + + public static void checkMigrationStatus(CommandLine cmd, PrintFunction printFunction) { + checkMigrationStatus( + cmd, + printFunction, + new ControllerClient(getRequiredArgument(cmd, Arg.CLUSTER_SRC), getRequiredArgument(cmd, Arg.URL), sslFactory), + new ControllerClient( + getRequiredArgument(cmd, Arg.CLUSTER_DEST), + getRequiredArgument(cmd, Arg.URL), + sslFactory)); + } + + public static void checkMigrationStatus( + CommandLine cmd, + PrintFunction printFunction, + ControllerClient srcControllerClient, + ControllerClient destControllerClient) { String storeName = getRequiredArgument(cmd, Arg.STORE); String srcClusterName = getRequiredArgument(cmd, Arg.CLUSTER_SRC); String destClusterName = getRequiredArgument(cmd, Arg.CLUSTER_DEST); @@ -1644,27 +1682,38 @@ private static void checkMigrationStatus(CommandLine cmd) { throw new VeniceException("Source cluster and destination cluster cannot be the same!"); } - ControllerClient srcControllerClient = new ControllerClient(srcClusterName, veniceUrl, sslFactory); - ControllerClient destControllerClient = new ControllerClient(destClusterName, veniceUrl, sslFactory); - ChildAwareResponse response = srcControllerClient.listChildControllers(srcClusterName); + if (response.getChildDataCenterControllerUrlMap() == null && response.getChildDataCenterControllerD2Map() == null) { // This is a controller in single datacenter setup - printMigrationStatus(srcControllerClient, storeName); - printMigrationStatus(destControllerClient, storeName); + printMigrationStatus(srcControllerClient, storeName, printFunction); + printMigrationStatus(destControllerClient, storeName, printFunction); + + printSystemStoreMigrationStatus(srcControllerClient, storeName, printFunction); + printSystemStoreMigrationStatus(destControllerClient, storeName, printFunction); } else { // This is a parent controller System.err.println("\n=================== Parent Controllers ===================="); - printMigrationStatus(srcControllerClient, storeName); - printMigrationStatus(destControllerClient, storeName); + printMigrationStatus(srcControllerClient, storeName, printFunction); + printMigrationStatus(destControllerClient, storeName, printFunction); + + printSystemStoreMigrationStatus(srcControllerClient, storeName, printFunction); + printSystemStoreMigrationStatus(destControllerClient, storeName, printFunction); Map srcChildControllerClientMap = getControllerClientMap(srcClusterName, response); Map destChildControllerClientMap = getControllerClientMap(destClusterName, response); for (Map.Entry entry: srcChildControllerClientMap.entrySet()) { System.err.println("\n\n=================== Child Datacenter " + entry.getKey() + " ===================="); - printMigrationStatus(entry.getValue(), storeName); - printMigrationStatus(destChildControllerClientMap.get(entry.getKey()), storeName); + + ControllerClient srcChildController = entry.getValue(); + ControllerClient destChildController = destChildControllerClientMap.get(entry.getKey()); + + printMigrationStatus(srcChildController, storeName, printFunction); + printMigrationStatus(destChildController, storeName, printFunction); + + printSystemStoreMigrationStatus(srcChildController, storeName, printFunction); + printSystemStoreMigrationStatus(destChildController, storeName, printFunction); } } } diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestCheckMigrationStatus.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestCheckMigrationStatus.java new file mode 100644 index 0000000000..333fd41cf1 --- /dev/null +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestCheckMigrationStatus.java @@ -0,0 +1,97 @@ +package com.linkedin.venice; + +import com.linkedin.venice.controllerapi.ChildAwareResponse; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse; +import com.linkedin.venice.controllerapi.MultiStoreResponse; +import com.linkedin.venice.controllerapi.StoreResponse; +import com.linkedin.venice.meta.StoreInfo; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import org.apache.commons.cli.ParseException; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TestCheckMigrationStatus { + String SRC_CLUSTER_NAME = "venice-source"; + String DESTINATION_CLUSTER_NAME = "venice-destination"; + String CONTROLLER_URL = "http://somePlaceElse.com:12345"; + String STORE_NAME = "TestStore"; + String SYSTEM_STORE_NAME = "venice_system_store_davinci_" + STORE_NAME; + + @Test + public void testCheckMigrationStatus() throws ParseException, IOException { + String[] checkMigrationStatusArgs = { "--migration-status", "--url", CONTROLLER_URL, "--store", STORE_NAME, + "--cluster-src", SRC_CLUSTER_NAME, "--cluster-dest", DESTINATION_CLUSTER_NAME }; + + // Mock source controller client + ControllerClient mockSourceControllerClient = Mockito.mock(ControllerClient.class); + StoreInfo mockStoreInfo = new StoreInfo(); + mockStoreInfo.setVersions(new ArrayList<>()); + StoreResponse mockStoreResponse = Mockito.mock(StoreResponse.class); + MultiStoreResponse mockMultiStoreResponse = new MultiStoreResponse(); + D2ServiceDiscoveryResponse mockD2ServiceDiscoveryResponse = Mockito.mock(D2ServiceDiscoveryResponse.class); + Mockito.when(mockD2ServiceDiscoveryResponse.getCluster()).thenReturn(SRC_CLUSTER_NAME); + mockMultiStoreResponse.setStores(new String[] { SYSTEM_STORE_NAME, STORE_NAME }); + ChildAwareResponse mockChildAwareResponse = Mockito.mock(ChildAwareResponse.class); + Mockito.when(mockChildAwareResponse.getChildDataCenterControllerUrlMap()).thenReturn(new HashMap<>()); + Mockito.when(mockChildAwareResponse.getChildDataCenterControllerD2Map()).thenReturn(new HashMap<>()); + Mockito.when(mockSourceControllerClient.listChildControllers(SRC_CLUSTER_NAME)).thenReturn(mockChildAwareResponse); + Mockito.when(mockStoreResponse.getStore()).thenReturn(mockStoreInfo); + Mockito.when(mockSourceControllerClient.getStore(STORE_NAME)).thenReturn(mockStoreResponse); + Mockito.when(mockSourceControllerClient.getStore(SYSTEM_STORE_NAME)).thenReturn(mockStoreResponse); + Mockito.when(mockSourceControllerClient.getClusterName()).thenReturn(SRC_CLUSTER_NAME); + Mockito.when(mockSourceControllerClient.getLeaderControllerUrl()).thenReturn(CONTROLLER_URL); + Mockito.when(mockSourceControllerClient.discoverCluster(STORE_NAME)).thenReturn(mockD2ServiceDiscoveryResponse); + Mockito.when(mockSourceControllerClient.queryStoreList()).thenReturn(mockMultiStoreResponse); + + ControllerClient mockDestinationControllerClient = Mockito.mock(ControllerClient.class); + mockMultiStoreResponse.setStores(new String[] { SYSTEM_STORE_NAME, STORE_NAME }); + Mockito.when(mockStoreResponse.getStore()).thenReturn(mockStoreInfo); + Mockito.when(mockDestinationControllerClient.getStore(STORE_NAME)).thenReturn(mockStoreResponse); + Mockito.when(mockDestinationControllerClient.getStore(SYSTEM_STORE_NAME)).thenReturn(mockStoreResponse); + Mockito.when(mockDestinationControllerClient.getClusterName()).thenReturn(SRC_CLUSTER_NAME); + Mockito.when(mockDestinationControllerClient.getLeaderControllerUrl()).thenReturn(CONTROLLER_URL); + Mockito.when(mockDestinationControllerClient.discoverCluster(STORE_NAME)) + .thenReturn(mockD2ServiceDiscoveryResponse); + Mockito.when(mockDestinationControllerClient.queryStoreList()).thenReturn(mockMultiStoreResponse); + + // Store migration status output via closure PrintFunction + Set statusOutput = new HashSet(); + AdminTool.PrintFunction printFunction = (message) -> { + statusOutput.add(message.trim()); + }; + + AdminTool.checkMigrationStatus( + AdminTool.getCommandLine(checkMigrationStatusArgs), + printFunction, + mockSourceControllerClient, + mockDestinationControllerClient); + + Assert.assertTrue(statusOutput.contains(String.format("%s\t%s", SRC_CLUSTER_NAME, CONTROLLER_URL))); + Assert + .assertTrue(statusOutput.contains(String.format("%s exists in this cluster %s", STORE_NAME, SRC_CLUSTER_NAME))); + Assert.assertTrue(statusOutput.contains(String.format("%s.isMigrating = false", STORE_NAME))); + Assert.assertTrue(statusOutput.contains(String.format("%s.largestUsedVersion = 0", STORE_NAME))); + Assert.assertTrue(statusOutput.contains(String.format("%s.currentVersion = 0", STORE_NAME))); + Assert.assertTrue(statusOutput.contains(String.format("%s.versions =", STORE_NAME))); + Assert.assertTrue( + statusOutput.contains( + String.format("%s belongs to cluster %s according to cluster discovery", STORE_NAME, SRC_CLUSTER_NAME))); + + Assert.assertFalse(statusOutput.contains(String.format("%s\t%s", DESTINATION_CLUSTER_NAME, CONTROLLER_URL))); + Assert.assertFalse( + statusOutput.contains(String.format("%s exists in this cluster %s", STORE_NAME, DESTINATION_CLUSTER_NAME))); + Assert.assertFalse( + statusOutput.contains( + String.format( + "%s belongs to cluster %s according to cluster discovery", + STORE_NAME, + DESTINATION_CLUSTER_NAME))); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java index 9d8b326f62..b405a85cb1 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreMigration.java @@ -18,6 +18,7 @@ import com.linkedin.davinci.client.DaVinciClient; import com.linkedin.davinci.client.DaVinciConfig; import com.linkedin.venice.AdminTool; +import com.linkedin.venice.AdminTool.PrintFunction; import com.linkedin.venice.D2.D2ClientUtils; import com.linkedin.venice.client.store.AbstractAvroStoreClient; import com.linkedin.venice.client.store.AvroGenericStoreClient; @@ -56,8 +57,10 @@ import java.io.File; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; @@ -292,8 +295,8 @@ public void testStoreMigrationWithDaVinciPushStatusSystemStore() throws Exceptio createAndPushStore(srcClusterName, storeName); // DaVinci push status system store is enabled by default. Check if it has online version. + String systemStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(storeName); try (ControllerClient srcChildControllerClient = new ControllerClient(srcClusterName, childControllerUrl0)) { - String systemStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(storeName); TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { StoreResponse storeResponse = srcChildControllerClient.getStore(systemStoreName); Assert.assertFalse(storeResponse.isError()); @@ -325,6 +328,30 @@ public void testStoreMigrationWithDaVinciPushStatusSystemStore() throws Exceptio .assertEquals(pushStatusStoreReader.getPartitionStatus(storeName, 1, 0, Optional.empty()).size(), 1)); startMigration(parentControllerUrl, storeName); + + // Store migration status output via closure PrintFunction + Set statusOutput = new HashSet(); + PrintFunction printFunction = (message) -> { + statusOutput.add(message.trim()); + System.err.println(message); + }; + + checkMigrationStatus(parentControllerUrl, storeName, printFunction); + + // Check that store and system store exists in both source and destination cluster + Assert.assertTrue( + statusOutput.contains( + String.format( + "%s belongs to cluster venice-cluster0 according to cluster discovery", + storeName, + srcClusterName))); + Assert + .assertTrue(statusOutput.contains(String.format("%s exists in this cluster %s", storeName, destClusterName))); + Assert.assertTrue( + statusOutput.contains(String.format("%s exists in this cluster %s", systemStoreName, srcClusterName))); + Assert.assertTrue( + statusOutput.contains(String.format("%s exists in this cluster %s", systemStoreName, destClusterName))); + completeMigration(parentControllerUrl, storeName); // Verify the da vinci push status system store is materialized in dest cluster and contains the same value @@ -333,6 +360,20 @@ public void testStoreMigrationWithDaVinciPushStatusSystemStore() throws Exceptio TimeUnit.SECONDS, () -> Assert .assertEquals(pushStatusStoreReader.getPartitionStatus(storeName, 1, 0, Optional.empty()).size(), 1)); + + // Verify that store and system store only exist in destination cluster after ending migration + statusOutput.clear(); + endMigration(parentControllerUrl, storeName); + checkMigrationStatus(parentControllerUrl, storeName, printFunction); + + Assert + .assertFalse(statusOutput.contains(String.format("%s exists in this cluster %s", storeName, srcClusterName))); + Assert + .assertTrue(statusOutput.contains(String.format("%s exists in this cluster %s", storeName, destClusterName))); + Assert.assertFalse( + statusOutput.contains(String.format("%s exists in this cluster %s", systemStoreName, srcClusterName))); + Assert.assertTrue( + statusOutput.contains(String.format("%s exists in this cluster %s", systemStoreName, destClusterName))); } finally { Utils.closeQuietlyWithErrorLogged(pushStatusStoreReader); D2ClientUtils.shutdownClient(d2Client); @@ -471,6 +512,13 @@ private void startMigration(String controllerUrl, String storeName) throws Excep AdminTool.main(startMigrationArgs); } + private void checkMigrationStatus(String controllerUrl, String storeName, PrintFunction printFunction) + throws Exception { + String[] checkMigrationStatusArgs = { "--migration-status", "--url", controllerUrl, "--store", storeName, + "--cluster-src", srcClusterName, "--cluster-dest", destClusterName }; + AdminTool.checkMigrationStatus(AdminTool.getCommandLine(checkMigrationStatusArgs), printFunction); + } + private void completeMigration(String controllerUrl, String storeName) { String[] completeMigration0 = { "--complete-migration", "--url", controllerUrl, "--store", storeName, "--cluster-src", srcClusterName, "--cluster-dest", destClusterName, "--fabric", FABRIC0 };