From c1a9ab17aa689c62fed2eaa5a33282da106666d1 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Tue, 24 Oct 2023 15:55:13 -0700 Subject: [PATCH] [admin-tool][controller] Add new admin tool command to configure/remove particular view for store (#695) This PR introduces a new AdminTool command to configure (add/update/remove) a view of a store. The command name is --configure-store-view. It takes the following parameter: --view-name: Required at all time. Value: Name of the view to act on. --remove-view: If present, this action is to remove an existing view from the store config. No Value. --view-class: Required when it is not removing a view. Value: View class name of the view. --view-params: Optional. Value: Additional parameters for the view in the format of string to string map. If not specified, meaning it is going to use empty map, instead of inheriting existing value. The actual logic is inside update store logic. The reason to separate it into a new command is to make it more user friendly to specify the intention, instead of putting everything into a Json map. For the existing setView() API in UpdateStoreQueryParams, this PR changes the behavior from merging the input into existing view config to completely overwrite the old config, as it is overlapping with the functionality with the new command to some extent. --- .../java/com/linkedin/venice/AdminTool.java | 26 +++- .../main/java/com/linkedin/venice/Arg.java | 5 +- .../java/com/linkedin/venice/Command.java | 8 ++ .../com/linkedin/venice/TestAdminTool.java | 41 ++++++ .../controllerapi/ControllerApiConstants.java | 4 + .../controllerapi/UpdateStoreQueryParams.java | 36 ++++++ .../endToEnd/TestActiveActiveIngestion.java | 44 +++++-- .../venice/controller/StoreViewUtils.java | 29 +++-- .../venice/controller/VeniceHelixAdmin.java | 37 +++++- .../controller/VeniceParentHelixAdmin.java | 54 +++++--- .../TestVeniceParentHelixAdmin.java | 120 +++++++++++++++++- 11 files changed, 358 insertions(+), 46 deletions(-) diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index 91f83a9bb4..82df29d470 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -542,6 +542,9 @@ public static void main(String args[]) throws Exception { case DUMP_INGESTION_STATE: dumpIngestionState(cmd); break; + case CONFIGURE_STORE_VIEW: + configureStoreView(cmd); + break; default: StringJoiner availableCommands = new StringJoiner(", "); for (Command c: Command.values()) { @@ -1013,7 +1016,6 @@ private static void genericParam( private static void updateStore(CommandLine cmd) { UpdateStoreQueryParams params = getUpdateStoreQueryParams(cmd); - params.setStoreViews(new HashMap<>()); String storeName = getRequiredArgument(cmd, Arg.STORE, Command.UPDATE_STORE); ControllerResponse response = controllerClient.updateStore(storeName, params); printSuccess(response); @@ -1026,6 +1028,21 @@ private static void updateClusterConfig(CommandLine cmd) { printSuccess(response); } + static UpdateStoreQueryParams getConfigureStoreViewQueryParams(CommandLine cmd) { + Set argSet = new HashSet<>(Arrays.asList(Command.CONFIGURE_STORE_VIEW.getOptionalArgs())); + argSet.addAll(new HashSet<>(Arrays.asList(Command.CONFIGURE_STORE_VIEW.getRequiredArgs()))); + UpdateStoreQueryParams params = new UpdateStoreQueryParams(); + params.setViewName(getRequiredArgument(cmd, Arg.VIEW_NAME)); + if (cmd.hasOption(Arg.REMOVE_VIEW.toString())) { + params.setDisableStoreView(); + } else { + // If configuring a view, view class name is required. + params.setViewClassName(getRequiredArgument(cmd, Arg.VIEW_CLASS)); + } + stringMapParam(cmd, Arg.VIEW_PARAMS, p -> params.setViewClassParams(p), argSet); + return params; + } + static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) { Set argSet = new HashSet<>(Arrays.asList(Command.UPDATE_STORE.getOptionalArgs())); argSet.addAll(new HashSet<>(Arrays.asList(Command.UPDATE_STORE.getRequiredArgs()))); @@ -2881,6 +2898,13 @@ private static void dumpIngestionState(CommandLine cmd) throws Exception { } } + private static void configureStoreView(CommandLine cmd) { + UpdateStoreQueryParams params = getConfigureStoreViewQueryParams(cmd); + String storeName = getRequiredArgument(cmd, Arg.STORE, Command.CONFIGURE_STORE_VIEW); + ControllerResponse response = controllerClient.updateStore(storeName, params); + printObject(response); + } + static void dumpIngestionState(TransportClient transportClient, String storeName, String version, String partition) throws Exception { StringBuilder sb = new StringBuilder(QueryAction.ADMIN.toString().toLowerCase()).append("/") diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java index a0d26b304e..44720e1769 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java @@ -217,7 +217,10 @@ public enum Arg { STORE_VIEW_CONFIGS( "storage-view-configs", "svc", true, "Config that describes views to be added for a store. Input is a json map. Example: {\"ExampleView\": {\"viewClassName\": \"com.linkedin.venice.views.ChangeCaptureView\",\"params\": {}}}" - ), + ), VIEW_NAME("view-name", "vn", true, "Name of a store view"), + VIEW_CLASS("view-class", "vc", true, "Name of a store view class"), + VIEW_PARAMS("view-params", "vp", true, "Additional parameter map of a store view class"), + REMOVE_VIEW("remove-view", "rv", false, "Optional config to specify to disable certain store view"), PARTITION_DETAIL_ENABLED( "partition-detail-enabled", "pde", true, "A flag to indicate whether to retrieve partition details" ), diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index 79f30a80cd..a518b67a69 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -81,6 +81,7 @@ import static com.linkedin.venice.Arg.RECOVERY_COMMAND; import static com.linkedin.venice.Arg.REGIONS_FILTER; import static com.linkedin.venice.Arg.REGULAR_VERSION_ETL_ENABLED; +import static com.linkedin.venice.Arg.REMOVE_VIEW; import static com.linkedin.venice.Arg.REPLICATE_ALL_CONFIGS; import static com.linkedin.venice.Arg.REPLICATION_FACTOR; import static com.linkedin.venice.Arg.RETRY; @@ -107,6 +108,9 @@ import static com.linkedin.venice.Arg.VENICE_CLIENT_SSL_CONFIG_FILE; import static com.linkedin.venice.Arg.VENICE_ZOOKEEPER_URL; import static com.linkedin.venice.Arg.VERSION; +import static com.linkedin.venice.Arg.VIEW_CLASS; +import static com.linkedin.venice.Arg.VIEW_NAME; +import static com.linkedin.venice.Arg.VIEW_PARAMS; import static com.linkedin.venice.Arg.VOLDEMORT_STORE; import static com.linkedin.venice.Arg.VSON_STORE; import static com.linkedin.venice.Arg.WRITEABILITY; @@ -474,6 +478,10 @@ public enum Command { "dump-ingestion-state", "Dump the real-time ingestion state for a certain store version in a certain storage node", new Arg[] { SERVER_URL, STORE, VERSION }, new Arg[] { PARTITION } + ), + CONFIGURE_STORE_VIEW( + "configure-store-view", "Configure store view of a certain store", new Arg[] { URL, CLUSTER, STORE, VIEW_NAME }, + new Arg[] { VIEW_CLASS, VIEW_PARAMS, REMOVE_VIEW } ); private final String commandName; diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java index ba1612bc77..e7cef8c86e 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java @@ -28,6 +28,7 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; import com.linkedin.venice.serializer.RecordSerializer; +import com.linkedin.venice.views.ChangeCaptureView; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -284,4 +285,44 @@ public void testAdminToolDumpIngestionState() throws Exception { doReturn(completableFuture).when(transportClient).get(anyString()); AdminTool.dumpIngestionState(transportClient, storeName, version, null); } + + @Test + public void testAdminConfigureView() throws ParseException, IOException { + final String K1 = "kafka.linger.ms", V1 = "0", K2 = "dummy.key", V2 = "dummy.val"; + + // Case 1: Happy path to setup a view. + String[] args = { "--configure-store-view", "--url", "http://localhost:7036", "--cluster", "test-cluster", + "--store", "testStore", "--view-name", "testView", "--view-class", ChangeCaptureView.class.getCanonicalName(), + "--view-params", K1 + "=" + V1 + "," + K2 + "=" + V2 }; + + CommandLine commandLine = AdminTool.getCommandLine(args); + UpdateStoreQueryParams params = AdminTool.getConfigureStoreViewQueryParams(commandLine); + Assert.assertTrue(params.getViewName().isPresent()); + Assert.assertEquals(params.getViewName().get(), "testView"); + Assert.assertTrue(params.getViewClassName().isPresent()); + Assert.assertEquals(params.getViewClassName().get(), ChangeCaptureView.class.getCanonicalName()); + + Optional> viewParams = params.getViewClassParams(); + Assert.assertTrue(viewParams.isPresent()); + Map viewParamsMap = viewParams.get(); + Assert.assertEquals(viewParamsMap.get(K1), V1); + Assert.assertEquals(viewParamsMap.get(K2), V2); + + // Case 2: Happy path to disable a view. + String[] args1 = { "--configure-store-view", "--url", "http://localhost:7036", "--cluster", "test-cluster", + "--store", "testStore", "--view-name", "testView", "--remove-view" }; + commandLine = AdminTool.getCommandLine(args1); + params = AdminTool.getConfigureStoreViewQueryParams(commandLine); + Assert.assertTrue(params.getViewName().isPresent()); + Assert.assertTrue(params.getDisableStoreView().isPresent()); + Assert.assertEquals(params.getViewName().get(), "testView"); + Assert.assertFalse(params.getViewClassName().isPresent()); + + // Case 3: Configure view with missing viewName; + String[] args2 = { "--configure-store-view", "--url", "http://localhost:7036", "--cluster", "test-cluster", + "--store", "testStore", "--view-name", "testView" }; + commandLine = AdminTool.getCommandLine(args2); + CommandLine finalCommandLine = commandLine; + Assert.assertThrows(() -> AdminTool.getConfigureStoreViewQueryParams(finalCommandLine)); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java index 639293063f..7de9aea049 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java @@ -89,6 +89,10 @@ public class ControllerApiConstants { public static final String INCLUDE_SYSTEM_STORES = "include_system_stores"; public static final String STORE_VIEW = "store_view"; + public static final String STORE_VIEW_NAME = "store_view_name"; + public static final String STORE_VIEW_CLASS = "store_view_class"; + public static final String STORE_VIEW_PARAMS = "store_view_params"; + public static final String DISABLE_STORE_VIEW = "disable_store_view"; public static final String NATIVE_REPLICATION_ENABLED = "native_replication_enabled"; public static final String PUSH_STREAM_SOURCE_ADDRESS = "push_stream_source_address"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java index d87a8f5a51..63875bc637 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java @@ -15,6 +15,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.DATA_REPLICATION_POLICY; import static com.linkedin.venice.controllerapi.ControllerApiConstants.DISABLE_DAVINCI_PUSH_STATUS_STORE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.DISABLE_META_STORE; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.DISABLE_STORE_VIEW; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ENABLE_READS; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ENABLE_WRITES; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ETLED_PROXY_USER_ACCOUNT; @@ -49,6 +50,9 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_QUOTA_IN_BYTE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_MIGRATION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_CLASS; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_NAME; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_PARAMS; import static com.linkedin.venice.controllerapi.ControllerApiConstants.TIME_LAG_TO_GO_ONLINE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPDATED_CONFIGS_LIST; import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION; @@ -618,6 +622,38 @@ public Optional getMinCompactionLagSeconds() { return getLong(MIN_COMPACTION_LAG_SECONDS); } + public Optional getViewName() { + return getString(STORE_VIEW_NAME); + } + + public UpdateStoreQueryParams setViewName(String viewName) { + return (UpdateStoreQueryParams) add(STORE_VIEW_NAME, viewName); + } + + public Optional getViewClassName() { + return getString(STORE_VIEW_CLASS); + } + + public UpdateStoreQueryParams setViewClassName(String viewClassName) { + return (UpdateStoreQueryParams) add(STORE_VIEW_CLASS, viewClassName); + } + + public Optional> getViewClassParams() { + return getStringMap(STORE_VIEW_PARAMS); + } + + public UpdateStoreQueryParams setViewClassParams(Map partitionerParams) { + return (UpdateStoreQueryParams) putStringMap(STORE_VIEW_PARAMS, partitionerParams); + } + + public Optional getDisableStoreView() { + return getBoolean(DISABLE_STORE_VIEW); + } + + public UpdateStoreQueryParams setDisableStoreView() { + return (UpdateStoreQueryParams) add(DISABLE_STORE_VIEW, true); + } + // ***************** above this line are getters and setters ***************** private UpdateStoreQueryParams putInteger(String name, int value) { return (UpdateStoreQueryParams) add(name, value); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java index b8e90f305e..35f78fa5ac 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java @@ -55,6 +55,7 @@ import com.linkedin.venice.integration.utils.ZkServerWrapper; import com.linkedin.venice.meta.VeniceUserStoreType; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pushmonitor.ExecutionStatus; @@ -545,13 +546,8 @@ public void testAAIngestionWithStoreView() throws Exception { Map viewConfig = new HashMap<>(); props.put(KAFKA_LINGER_MS, 0); viewConfig.put( - "testView", + "testViewWrong", "{\"viewClassName\" : \"" + TestView.class.getCanonicalName() + "\", \"viewParameters\" : {}}"); - - viewConfig.put( - "changeCaptureView", - "{\"viewClassName\" : \"" + ChangeCaptureView.class.getCanonicalName() - + "\", \"viewParameters\" : {\"kafka.linger.ms\": \"0\"}}"); UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true) .setHybridRewindSeconds(500) .setHybridOffsetLagThreshold(8) @@ -561,11 +557,39 @@ public void testAAIngestionWithStoreView() throws Exception { MetricsRepository metricsRepository = new MetricsRepository(); ControllerClient setupControllerClient = createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms); - storeParms.setStoreViews(viewConfig); - // IntegrationTestPushUtils.updateStore(clusterName, props, storeParms); + UpdateStoreQueryParams storeParams1 = new UpdateStoreQueryParams().setStoreViews(viewConfig); + setupControllerClient + .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams1)); + UpdateStoreQueryParams storeParams2 = + new UpdateStoreQueryParams().setViewName("testViewWrong").setDisableStoreView(); + setupControllerClient + .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams2)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + Map viewConfigMap = setupControllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertTrue(viewConfigMap.isEmpty()); + }); + + UpdateStoreQueryParams storeParams3 = new UpdateStoreQueryParams().setViewName("changeCaptureView") + .setViewClassName(ChangeCaptureView.class.getCanonicalName()) + .setViewClassParams(Collections.singletonMap("kafka.linger.ms", "0")); setupControllerClient - .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParms)); - // controllerClient.updateStore(storeName, storeParms); + .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams3)); + + UpdateStoreQueryParams storeParams4 = + new UpdateStoreQueryParams().setViewName("testView").setViewClassName(TestView.class.getCanonicalName()); + setupControllerClient + .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams4)); + + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + Map viewConfigMap = setupControllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertEquals(viewConfigMap.size(), 2); + Assert.assertEquals(viewConfigMap.get("testView").getViewClassName(), TestView.class.getCanonicalName()); + Assert.assertEquals( + viewConfigMap.get("changeCaptureView").getViewClassName(), + ChangeCaptureView.class.getCanonicalName()); + Assert.assertEquals(viewConfigMap.get("changeCaptureView").getViewParameters().size(), 1); + }); + TestWriteUtils.runPushJob("Run push job", props); Map samzaConfig = getSamzaConfig(storeName); VeniceSystemFactory factory = new VeniceSystemFactory(); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreViewUtils.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreViewUtils.java index fd798e0971..3e363fdd6b 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreViewUtils.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreViewUtils.java @@ -20,8 +20,8 @@ public class StoreViewUtils { private static final VeniceJsonSerializer viewConfigVeniceJsonSerializer = new VeniceJsonSerializer<>(ViewConfig.class); - static Map convertStringMapViewToStoreViewConfigRecord(Map stringMap) - throws VeniceException { + static Map convertStringMapViewToStoreViewConfigRecordMap( + Map stringMap) throws VeniceException { Map mergedViewConfigRecords = new HashMap<>(); if (!stringMap.isEmpty()) { for (Map.Entry stringViewConfig: stringMap.entrySet()) { @@ -41,40 +41,41 @@ static Map convertStringMapViewToStoreViewConfigR return mergedViewConfigRecords; } - static Map convertStringMapViewToStoreViewConfig(Map stringMap) { + static Map convertStringMapViewToStoreViewConfigMap(Map stringMap) { Map mergedViewConfigRecords = new HashMap<>(); if (!stringMap.isEmpty()) { for (Map.Entry stringViewConfig: stringMap.entrySet()) { + StoreViewConfig newViewConfig; try { ViewConfig viewConfig = viewConfigVeniceJsonSerializer.deserialize(stringViewConfig.getValue().getBytes(), ""); - StoreViewConfig newViewConfig = new StoreViewConfig( + newViewConfig = new StoreViewConfig( viewConfig.getViewClassName(), CollectionUtils.getStringKeyCharSequenceValueMapFromStringMap(viewConfig.getViewParameters())); - mergedViewConfigRecords.put(stringViewConfig.getKey(), newViewConfig); } catch (IOException e) { LOGGER.error("Failed to serialize provided view config: {}", stringViewConfig.getValue()); throw new VeniceException("Failed to serialize provided view config:" + stringViewConfig.getValue(), e); } + mergedViewConfigRecords.put(stringViewConfig.getKey(), newViewConfig); } } return mergedViewConfigRecords; } - static Map convertStringMapViewToViewConfig(Map stringMap) { - return convertStringMapViewToStoreViewConfig(stringMap).entrySet() + static Map convertStringMapViewToViewConfigMap(Map stringMap) { + return convertStringMapViewToStoreViewConfigMap(stringMap).entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> new ViewConfigImpl(e.getValue()))); } - static Map convertViewConfigToStoreViewConfig(Map viewConfigMap) { + static Map convertViewConfigMapToStoreViewRecordMap( + Map viewConfigMap) { return viewConfigMap.entrySet() .stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> new StoreViewConfigRecord( - e.getValue().getViewClassName(), - e.getValue().dataModel().getViewParameters()))); + .collect(Collectors.toMap(Map.Entry::getKey, e -> convertViewConfigToStoreViewConfigRecord(e.getValue()))); + } + + static StoreViewConfigRecord convertViewConfigToStoreViewConfigRecord(ViewConfig viewConfig) { + return new StoreViewConfigRecord(viewConfig.getViewClassName(), viewConfig.dataModel().getViewParameters()); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index d5dca89826..a4b22b48fd 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -4016,7 +4016,7 @@ private void setPushStreamSourceAddress(String clusterName, String storeName, St private void addStoreViews(String clusterName, String storeName, Map viewConfigMap) { storeMetadataUpdate(clusterName, storeName, store -> { - store.setViewConfigs(StoreViewUtils.convertStringMapViewToViewConfig(viewConfigMap)); + store.setViewConfigs(StoreViewUtils.convertStringMapViewToViewConfigMap(viewConfigMap)); return store; }); } @@ -4672,8 +4672,39 @@ static Map mergeNewViewConfigsIntoOldConfigs( oldViewConfigMap = new HashMap<>(); } Map mergedConfigs = - StoreViewUtils.convertViewConfigToStoreViewConfig(oldViewConfigMap); - mergedConfigs.putAll(StoreViewUtils.convertStringMapViewToStoreViewConfigRecord(viewParameters)); + StoreViewUtils.convertViewConfigMapToStoreViewRecordMap(oldViewConfigMap); + mergedConfigs.putAll(StoreViewUtils.convertStringMapViewToStoreViewConfigRecordMap(viewParameters)); + return mergedConfigs; + } + + static Map addNewViewConfigsIntoOldConfigs( + Store oldStore, + String viewClass, + ViewConfig viewConfig) throws VeniceException { + // Add new view config into the existing config map. The new configs will override existing ones which share the + // same key. + Map oldViewConfigMap = oldStore.getViewConfigs(); + if (oldViewConfigMap == null) { + oldViewConfigMap = new HashMap<>(); + } + Map mergedConfigs = + StoreViewUtils.convertViewConfigMapToStoreViewRecordMap(oldViewConfigMap); + + StoreViewConfigRecord newStoreViewConfigRecord = + StoreViewUtils.convertViewConfigToStoreViewConfigRecord(viewConfig); + mergedConfigs.put(viewClass, newStoreViewConfigRecord); + return mergedConfigs; + } + + static Map removeViewConfigFromStoreViewConfigMap(Store oldStore, String viewClass) + throws VeniceException { + Map oldViewConfigMap = oldStore.getViewConfigs(); + if (oldViewConfigMap == null) { + oldViewConfigMap = new HashMap<>(); + } + Map mergedConfigs = + StoreViewUtils.convertViewConfigMapToStoreViewRecordMap(oldViewConfigMap); + mergedConfigs.remove(viewClass); return mergedConfigs; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 48d957abe7..3dfd532528 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -179,6 +179,7 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.meta.ViewConfig; +import com.linkedin.venice.meta.ViewConfigImpl; import com.linkedin.venice.persona.StoragePersona; import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -2150,6 +2151,10 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa Optional regionsFilter = params.getRegionsFilter(); Optional personaName = params.getStoragePersona(); Optional> storeViewConfig = params.getStoreViews(); + Optional viewName = params.getViewName(); + Optional viewClassName = params.getViewClassName(); + Optional> viewParams = params.getViewClassParams(); + Optional removeView = params.getDisableStoreView(); Optional latestSupersetSchemaId = params.getLatestSupersetSchemaId(); /** @@ -2203,12 +2208,30 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa pushStreamSourceAddress.map(addToUpdatedConfigList(updatedConfigsList, PUSH_STREAM_SOURCE_ADDRESS)) .orElseGet(currStore::getPushStreamSourceAddress); + if (storeViewConfig.isPresent() && viewName.isPresent()) { + throw new VeniceException("Cannot update a store view and overwrite store view setup together!"); + } + if (viewName.isPresent()) { + Map updatedViewSettings; + if (!removeView.isPresent()) { + if (!viewClassName.isPresent()) { + throw new VeniceException("View class name is required when configuring a view."); + } + // If View parameter is not provided, use emtpy map instead. It does not inherit from existing config. + ViewConfig viewConfig = new ViewConfigImpl(viewClassName.get(), viewParams.orElse(Collections.emptyMap())); + validateStoreViewConfig(currStore, viewConfig); + updatedViewSettings = VeniceHelixAdmin.addNewViewConfigsIntoOldConfigs(currStore, viewName.get(), viewConfig); + } else { + updatedViewSettings = VeniceHelixAdmin.removeViewConfigFromStoreViewConfigMap(currStore, viewName.get()); + } + setStore.views = updatedViewSettings; + updatedConfigsList.add(STORE_VIEW); + } + if (storeViewConfig.isPresent()) { - // Validate and merge store views if they're getting set - validateStoreViewConfig(storeViewConfig.get(), currStore); - Map mergedViewSettings = - VeniceHelixAdmin.mergeNewViewConfigsIntoOldConfigs(currStore, storeViewConfig.get()); - setStore.views = mergedViewSettings; + // Validate and overwrite store views if they're getting set + validateStoreViewConfigs(storeViewConfig.get(), currStore); + setStore.views = StoreViewUtils.convertStringMapViewToStoreViewConfigRecordMap(storeViewConfig.get()); updatedConfigsList.add(STORE_VIEW); } @@ -2585,19 +2608,20 @@ && getVeniceHelixAdmin().isHybrid(setStore.getHybridStoreConfig()) && setStore.g } } - private void validateStoreViewConfig(Map stringMap, Store store) { - Map configs = StoreViewUtils.convertStringMapViewToViewConfig(stringMap); - for (Map.Entry viewConfig: configs.entrySet()) { - // TODO: Pass a proper properties object here. Today this isn't used in this context - VeniceView view = ViewUtils.getVeniceView( - viewConfig.getValue().getViewClassName(), - new Properties(), - store, - viewConfig.getValue().getViewParameters()); - view.validateConfigs(); + private void validateStoreViewConfigs(Map stringMap, Store store) { + Map configs = StoreViewUtils.convertStringMapViewToViewConfigMap(stringMap); + for (Map.Entry viewConfigEntry: configs.entrySet()) { + validateStoreViewConfig(store, viewConfigEntry.getValue()); } } + private void validateStoreViewConfig(Store store, ViewConfig viewConfig) { + // TODO: Pass a proper properties object here. Today this isn't used in this context + VeniceView view = + ViewUtils.getVeniceView(viewConfig.getViewClassName(), new Properties(), store, viewConfig.getViewParameters()); + view.validateConfigs(); + } + private SupersetSchemaGenerator getSupersetSchemaGenerator(String clusterName) { if (externalSupersetSchemaGenerator.isPresent() && getMultiClusterConfigs().getControllerConfig(clusterName) .isParentExternalSupersetSchemaGenerationEnabled()) { diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index 127090a329..926b533744 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -64,6 +64,7 @@ import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; +import com.linkedin.venice.meta.ViewConfigImpl; import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.InvalidKeySchemaPartitioner; @@ -84,11 +85,13 @@ import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.views.ChangeCaptureView; import com.linkedin.venice.writer.VeniceWriter; import java.nio.ByteBuffer; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -1837,11 +1840,124 @@ public void testUpdateStoreNativeReplicationSourceFabric() { int schemaId = schemaCaptor.getValue(); AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId); UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; - Assert.assertTrue( - "dc1".equals(updateStore.nativeReplicationSourceFabric.toString()), + Assert.assertEquals( + updateStore.nativeReplicationSourceFabric.toString(), + "dc1", "Native replication source fabric does not match after updating the store!"); } + @Test + public void testSetStoreViewConfig() { + String storeName = Utils.getUniqueString("testUpdateStore"); + Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); + store.setActiveActiveReplicationEnabled(true); + store.setChunkingEnabled(true); + doReturn(store).when(internalAdmin).getStore(clusterName, storeName); + + doReturn(CompletableFuture.completedFuture(new SimplePubSubProduceResultImpl(topicName, partitionId, 1, -1))) + .when(veniceWriter) + .put(any(), any(), anyInt()); + + when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) + .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + + parentAdmin.initStorageCluster(clusterName); + Map viewConfig = new HashMap<>(); + viewConfig.put( + "changeCapture", + "{\"viewClassName\" : \"" + ChangeCaptureView.class.getCanonicalName() + "\", \"viewParameters\" : {}}"); + parentAdmin.updateStore(clusterName, storeName, new UpdateStoreQueryParams().setStoreViews(viewConfig)); + + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor valueCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Integer.class); + + verify(veniceWriter, times(1)).put(keyCaptor.capture(), valueCaptor.capture(), schemaCaptor.capture()); + byte[] valueBytes = valueCaptor.getValue(); + int schemaId = schemaCaptor.getValue(); + AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId); + UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; + Assert.assertTrue(updateStore.getViews().containsKey("changeCapture")); + } + + @Test + public void testInsertStoreViewConfig() { + String storeName = Utils.getUniqueString("testUpdateStore"); + Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); + store.setActiveActiveReplicationEnabled(true); + store.setChunkingEnabled(true); + store.setViewConfigs( + Collections.singletonMap("testView", new ViewConfigImpl("testViewClassDummyName", Collections.emptyMap()))); + doReturn(store).when(internalAdmin).getStore(clusterName, storeName); + + doReturn(CompletableFuture.completedFuture(new SimplePubSubProduceResultImpl(topicName, partitionId, 1, -1))) + .when(veniceWriter) + .put(any(), any(), anyInt()); + + when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) + .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + + parentAdmin.initStorageCluster(clusterName); + parentAdmin.updateStore( + clusterName, + storeName, + new UpdateStoreQueryParams().setViewName("changeCapture") + .setViewClassName(ChangeCaptureView.class.getCanonicalName())); + + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor valueCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Integer.class); + + verify(veniceWriter, times(1)).put(keyCaptor.capture(), valueCaptor.capture(), schemaCaptor.capture()); + byte[] valueBytes = valueCaptor.getValue(); + int schemaId = schemaCaptor.getValue(); + AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId); + UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; + Assert.assertEquals(updateStore.getViews().size(), 2); + Assert.assertTrue(updateStore.getViews().containsKey("changeCapture")); + Assert.assertEquals( + updateStore.getViews().get("changeCapture").viewClassName.toString(), + ChangeCaptureView.class.getCanonicalName()); + Assert.assertTrue(updateStore.getViews().get("changeCapture").viewParameters.isEmpty()); + } + + @Test + public void testRemoveStoreViewConfig() { + String storeName = Utils.getUniqueString("testUpdateStore"); + Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); + store.setActiveActiveReplicationEnabled(true); + store.setChunkingEnabled(true); + store.setViewConfigs( + Collections.singletonMap( + "changeCapture", + new ViewConfigImpl(ChangeCaptureView.class.getCanonicalName(), Collections.emptyMap()))); + doReturn(store).when(internalAdmin).getStore(clusterName, storeName); + + doReturn(CompletableFuture.completedFuture(new SimplePubSubProduceResultImpl(topicName, partitionId, 1, -1))) + .when(veniceWriter) + .put(any(), any(), anyInt()); + + when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) + .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + + parentAdmin.initStorageCluster(clusterName); + parentAdmin.updateStore( + clusterName, + storeName, + new UpdateStoreQueryParams().setViewName("changeCapture").setDisableStoreView()); + + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor valueCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Integer.class); + + verify(veniceWriter, times(1)).put(keyCaptor.capture(), valueCaptor.capture(), schemaCaptor.capture()); + byte[] valueBytes = valueCaptor.getValue(); + int schemaId = schemaCaptor.getValue(); + AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId); + UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; + Assert.assertEquals(updateStore.getViews().size(), 0); + } + @Test public void testUpdateStoreWithBadPartitionerConfigs() { String storeName = Utils.getUniqueString("testUpdateStore");