diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index 91f83a9bb4..82df29d470 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -542,6 +542,9 @@ public static void main(String args[]) throws Exception { case DUMP_INGESTION_STATE: dumpIngestionState(cmd); break; + case CONFIGURE_STORE_VIEW: + configureStoreView(cmd); + break; default: StringJoiner availableCommands = new StringJoiner(", "); for (Command c: Command.values()) { @@ -1013,7 +1016,6 @@ private static void genericParam( private static void updateStore(CommandLine cmd) { UpdateStoreQueryParams params = getUpdateStoreQueryParams(cmd); - params.setStoreViews(new HashMap<>()); String storeName = getRequiredArgument(cmd, Arg.STORE, Command.UPDATE_STORE); ControllerResponse response = controllerClient.updateStore(storeName, params); printSuccess(response); @@ -1026,6 +1028,21 @@ private static void updateClusterConfig(CommandLine cmd) { printSuccess(response); } + static UpdateStoreQueryParams getConfigureStoreViewQueryParams(CommandLine cmd) { + Set argSet = new HashSet<>(Arrays.asList(Command.CONFIGURE_STORE_VIEW.getOptionalArgs())); + argSet.addAll(new HashSet<>(Arrays.asList(Command.CONFIGURE_STORE_VIEW.getRequiredArgs()))); + UpdateStoreQueryParams params = new UpdateStoreQueryParams(); + params.setViewName(getRequiredArgument(cmd, Arg.VIEW_NAME)); + if (cmd.hasOption(Arg.REMOVE_VIEW.toString())) { + params.setDisableStoreView(); + } else { + // If configuring a view, view class name is required. + params.setViewClassName(getRequiredArgument(cmd, Arg.VIEW_CLASS)); + } + stringMapParam(cmd, Arg.VIEW_PARAMS, p -> params.setViewClassParams(p), argSet); + return params; + } + static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) { Set argSet = new HashSet<>(Arrays.asList(Command.UPDATE_STORE.getOptionalArgs())); argSet.addAll(new HashSet<>(Arrays.asList(Command.UPDATE_STORE.getRequiredArgs()))); @@ -2881,6 +2898,13 @@ private static void dumpIngestionState(CommandLine cmd) throws Exception { } } + private static void configureStoreView(CommandLine cmd) { + UpdateStoreQueryParams params = getConfigureStoreViewQueryParams(cmd); + String storeName = getRequiredArgument(cmd, Arg.STORE, Command.CONFIGURE_STORE_VIEW); + ControllerResponse response = controllerClient.updateStore(storeName, params); + printObject(response); + } + static void dumpIngestionState(TransportClient transportClient, String storeName, String version, String partition) throws Exception { StringBuilder sb = new StringBuilder(QueryAction.ADMIN.toString().toLowerCase()).append("/") diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java index a0d26b304e..44720e1769 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java @@ -217,7 +217,10 @@ public enum Arg { STORE_VIEW_CONFIGS( "storage-view-configs", "svc", true, "Config that describes views to be added for a store. Input is a json map. Example: {\"ExampleView\": {\"viewClassName\": \"com.linkedin.venice.views.ChangeCaptureView\",\"params\": {}}}" - ), + ), VIEW_NAME("view-name", "vn", true, "Name of a store view"), + VIEW_CLASS("view-class", "vc", true, "Name of a store view class"), + VIEW_PARAMS("view-params", "vp", true, "Additional parameter map of a store view class"), + REMOVE_VIEW("remove-view", "rv", false, "Optional config to specify to disable certain store view"), PARTITION_DETAIL_ENABLED( "partition-detail-enabled", "pde", true, "A flag to indicate whether to retrieve partition details" ), diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index 79f30a80cd..a518b67a69 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -81,6 +81,7 @@ import static com.linkedin.venice.Arg.RECOVERY_COMMAND; import static com.linkedin.venice.Arg.REGIONS_FILTER; import static com.linkedin.venice.Arg.REGULAR_VERSION_ETL_ENABLED; +import static com.linkedin.venice.Arg.REMOVE_VIEW; import static com.linkedin.venice.Arg.REPLICATE_ALL_CONFIGS; import static com.linkedin.venice.Arg.REPLICATION_FACTOR; import static com.linkedin.venice.Arg.RETRY; @@ -107,6 +108,9 @@ import static com.linkedin.venice.Arg.VENICE_CLIENT_SSL_CONFIG_FILE; import static com.linkedin.venice.Arg.VENICE_ZOOKEEPER_URL; import static com.linkedin.venice.Arg.VERSION; +import static com.linkedin.venice.Arg.VIEW_CLASS; +import static com.linkedin.venice.Arg.VIEW_NAME; +import static com.linkedin.venice.Arg.VIEW_PARAMS; import static com.linkedin.venice.Arg.VOLDEMORT_STORE; import static com.linkedin.venice.Arg.VSON_STORE; import static com.linkedin.venice.Arg.WRITEABILITY; @@ -474,6 +478,10 @@ public enum Command { "dump-ingestion-state", "Dump the real-time ingestion state for a certain store version in a certain storage node", new Arg[] { SERVER_URL, STORE, VERSION }, new Arg[] { PARTITION } + ), + CONFIGURE_STORE_VIEW( + "configure-store-view", "Configure store view of a certain store", new Arg[] { URL, CLUSTER, STORE, VIEW_NAME }, + new Arg[] { VIEW_CLASS, VIEW_PARAMS, REMOVE_VIEW } ); private final String commandName; diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java index ba1612bc77..e7cef8c86e 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java @@ -28,6 +28,7 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; import com.linkedin.venice.serializer.RecordSerializer; +import com.linkedin.venice.views.ChangeCaptureView; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -284,4 +285,44 @@ public void testAdminToolDumpIngestionState() throws Exception { doReturn(completableFuture).when(transportClient).get(anyString()); AdminTool.dumpIngestionState(transportClient, storeName, version, null); } + + @Test + public void testAdminConfigureView() throws ParseException, IOException { + final String K1 = "kafka.linger.ms", V1 = "0", K2 = "dummy.key", V2 = "dummy.val"; + + // Case 1: Happy path to setup a view. + String[] args = { "--configure-store-view", "--url", "http://localhost:7036", "--cluster", "test-cluster", + "--store", "testStore", "--view-name", "testView", "--view-class", ChangeCaptureView.class.getCanonicalName(), + "--view-params", K1 + "=" + V1 + "," + K2 + "=" + V2 }; + + CommandLine commandLine = AdminTool.getCommandLine(args); + UpdateStoreQueryParams params = AdminTool.getConfigureStoreViewQueryParams(commandLine); + Assert.assertTrue(params.getViewName().isPresent()); + Assert.assertEquals(params.getViewName().get(), "testView"); + Assert.assertTrue(params.getViewClassName().isPresent()); + Assert.assertEquals(params.getViewClassName().get(), ChangeCaptureView.class.getCanonicalName()); + + Optional> viewParams = params.getViewClassParams(); + Assert.assertTrue(viewParams.isPresent()); + Map viewParamsMap = viewParams.get(); + Assert.assertEquals(viewParamsMap.get(K1), V1); + Assert.assertEquals(viewParamsMap.get(K2), V2); + + // Case 2: Happy path to disable a view. + String[] args1 = { "--configure-store-view", "--url", "http://localhost:7036", "--cluster", "test-cluster", + "--store", "testStore", "--view-name", "testView", "--remove-view" }; + commandLine = AdminTool.getCommandLine(args1); + params = AdminTool.getConfigureStoreViewQueryParams(commandLine); + Assert.assertTrue(params.getViewName().isPresent()); + Assert.assertTrue(params.getDisableStoreView().isPresent()); + Assert.assertEquals(params.getViewName().get(), "testView"); + Assert.assertFalse(params.getViewClassName().isPresent()); + + // Case 3: Configure view with missing viewName; + String[] args2 = { "--configure-store-view", "--url", "http://localhost:7036", "--cluster", "test-cluster", + "--store", "testStore", "--view-name", "testView" }; + commandLine = AdminTool.getCommandLine(args2); + CommandLine finalCommandLine = commandLine; + Assert.assertThrows(() -> AdminTool.getConfigureStoreViewQueryParams(finalCommandLine)); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java index 639293063f..7de9aea049 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java @@ -89,6 +89,10 @@ public class ControllerApiConstants { public static final String INCLUDE_SYSTEM_STORES = "include_system_stores"; public static final String STORE_VIEW = "store_view"; + public static final String STORE_VIEW_NAME = "store_view_name"; + public static final String STORE_VIEW_CLASS = "store_view_class"; + public static final String STORE_VIEW_PARAMS = "store_view_params"; + public static final String DISABLE_STORE_VIEW = "disable_store_view"; public static final String NATIVE_REPLICATION_ENABLED = "native_replication_enabled"; public static final String PUSH_STREAM_SOURCE_ADDRESS = "push_stream_source_address"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java index d87a8f5a51..63875bc637 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java @@ -15,6 +15,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.DATA_REPLICATION_POLICY; import static com.linkedin.venice.controllerapi.ControllerApiConstants.DISABLE_DAVINCI_PUSH_STATUS_STORE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.DISABLE_META_STORE; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.DISABLE_STORE_VIEW; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ENABLE_READS; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ENABLE_WRITES; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ETLED_PROXY_USER_ACCOUNT; @@ -49,6 +50,9 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_QUOTA_IN_BYTE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_MIGRATION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_CLASS; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_NAME; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_PARAMS; import static com.linkedin.venice.controllerapi.ControllerApiConstants.TIME_LAG_TO_GO_ONLINE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPDATED_CONFIGS_LIST; import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION; @@ -618,6 +622,38 @@ public Optional getMinCompactionLagSeconds() { return getLong(MIN_COMPACTION_LAG_SECONDS); } + public Optional getViewName() { + return getString(STORE_VIEW_NAME); + } + + public UpdateStoreQueryParams setViewName(String viewName) { + return (UpdateStoreQueryParams) add(STORE_VIEW_NAME, viewName); + } + + public Optional getViewClassName() { + return getString(STORE_VIEW_CLASS); + } + + public UpdateStoreQueryParams setViewClassName(String viewClassName) { + return (UpdateStoreQueryParams) add(STORE_VIEW_CLASS, viewClassName); + } + + public Optional> getViewClassParams() { + return getStringMap(STORE_VIEW_PARAMS); + } + + public UpdateStoreQueryParams setViewClassParams(Map partitionerParams) { + return (UpdateStoreQueryParams) putStringMap(STORE_VIEW_PARAMS, partitionerParams); + } + + public Optional getDisableStoreView() { + return getBoolean(DISABLE_STORE_VIEW); + } + + public UpdateStoreQueryParams setDisableStoreView() { + return (UpdateStoreQueryParams) add(DISABLE_STORE_VIEW, true); + } + // ***************** above this line are getters and setters ***************** private UpdateStoreQueryParams putInteger(String name, int value) { return (UpdateStoreQueryParams) add(name, value); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java index b8e90f305e..35f78fa5ac 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveIngestion.java @@ -55,6 +55,7 @@ import com.linkedin.venice.integration.utils.ZkServerWrapper; import com.linkedin.venice.meta.VeniceUserStoreType; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pushmonitor.ExecutionStatus; @@ -545,13 +546,8 @@ public void testAAIngestionWithStoreView() throws Exception { Map viewConfig = new HashMap<>(); props.put(KAFKA_LINGER_MS, 0); viewConfig.put( - "testView", + "testViewWrong", "{\"viewClassName\" : \"" + TestView.class.getCanonicalName() + "\", \"viewParameters\" : {}}"); - - viewConfig.put( - "changeCaptureView", - "{\"viewClassName\" : \"" + ChangeCaptureView.class.getCanonicalName() - + "\", \"viewParameters\" : {\"kafka.linger.ms\": \"0\"}}"); UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true) .setHybridRewindSeconds(500) .setHybridOffsetLagThreshold(8) @@ -561,11 +557,39 @@ public void testAAIngestionWithStoreView() throws Exception { MetricsRepository metricsRepository = new MetricsRepository(); ControllerClient setupControllerClient = createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms); - storeParms.setStoreViews(viewConfig); - // IntegrationTestPushUtils.updateStore(clusterName, props, storeParms); + UpdateStoreQueryParams storeParams1 = new UpdateStoreQueryParams().setStoreViews(viewConfig); + setupControllerClient + .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams1)); + UpdateStoreQueryParams storeParams2 = + new UpdateStoreQueryParams().setViewName("testViewWrong").setDisableStoreView(); + setupControllerClient + .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams2)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + Map viewConfigMap = setupControllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertTrue(viewConfigMap.isEmpty()); + }); + + UpdateStoreQueryParams storeParams3 = new UpdateStoreQueryParams().setViewName("changeCaptureView") + .setViewClassName(ChangeCaptureView.class.getCanonicalName()) + .setViewClassParams(Collections.singletonMap("kafka.linger.ms", "0")); setupControllerClient - .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParms)); - // controllerClient.updateStore(storeName, storeParms); + .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams3)); + + UpdateStoreQueryParams storeParams4 = + new UpdateStoreQueryParams().setViewName("testView").setViewClassName(TestView.class.getCanonicalName()); + setupControllerClient + .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams4)); + + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + Map viewConfigMap = setupControllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertEquals(viewConfigMap.size(), 2); + Assert.assertEquals(viewConfigMap.get("testView").getViewClassName(), TestView.class.getCanonicalName()); + Assert.assertEquals( + viewConfigMap.get("changeCaptureView").getViewClassName(), + ChangeCaptureView.class.getCanonicalName()); + Assert.assertEquals(viewConfigMap.get("changeCaptureView").getViewParameters().size(), 1); + }); + TestWriteUtils.runPushJob("Run push job", props); Map samzaConfig = getSamzaConfig(storeName); VeniceSystemFactory factory = new VeniceSystemFactory(); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreViewUtils.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreViewUtils.java index fd798e0971..3e363fdd6b 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreViewUtils.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreViewUtils.java @@ -20,8 +20,8 @@ public class StoreViewUtils { private static final VeniceJsonSerializer viewConfigVeniceJsonSerializer = new VeniceJsonSerializer<>(ViewConfig.class); - static Map convertStringMapViewToStoreViewConfigRecord(Map stringMap) - throws VeniceException { + static Map convertStringMapViewToStoreViewConfigRecordMap( + Map stringMap) throws VeniceException { Map mergedViewConfigRecords = new HashMap<>(); if (!stringMap.isEmpty()) { for (Map.Entry stringViewConfig: stringMap.entrySet()) { @@ -41,40 +41,41 @@ static Map convertStringMapViewToStoreViewConfigR return mergedViewConfigRecords; } - static Map convertStringMapViewToStoreViewConfig(Map stringMap) { + static Map convertStringMapViewToStoreViewConfigMap(Map stringMap) { Map mergedViewConfigRecords = new HashMap<>(); if (!stringMap.isEmpty()) { for (Map.Entry stringViewConfig: stringMap.entrySet()) { + StoreViewConfig newViewConfig; try { ViewConfig viewConfig = viewConfigVeniceJsonSerializer.deserialize(stringViewConfig.getValue().getBytes(), ""); - StoreViewConfig newViewConfig = new StoreViewConfig( + newViewConfig = new StoreViewConfig( viewConfig.getViewClassName(), CollectionUtils.getStringKeyCharSequenceValueMapFromStringMap(viewConfig.getViewParameters())); - mergedViewConfigRecords.put(stringViewConfig.getKey(), newViewConfig); } catch (IOException e) { LOGGER.error("Failed to serialize provided view config: {}", stringViewConfig.getValue()); throw new VeniceException("Failed to serialize provided view config:" + stringViewConfig.getValue(), e); } + mergedViewConfigRecords.put(stringViewConfig.getKey(), newViewConfig); } } return mergedViewConfigRecords; } - static Map convertStringMapViewToViewConfig(Map stringMap) { - return convertStringMapViewToStoreViewConfig(stringMap).entrySet() + static Map convertStringMapViewToViewConfigMap(Map stringMap) { + return convertStringMapViewToStoreViewConfigMap(stringMap).entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> new ViewConfigImpl(e.getValue()))); } - static Map convertViewConfigToStoreViewConfig(Map viewConfigMap) { + static Map convertViewConfigMapToStoreViewRecordMap( + Map viewConfigMap) { return viewConfigMap.entrySet() .stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> new StoreViewConfigRecord( - e.getValue().getViewClassName(), - e.getValue().dataModel().getViewParameters()))); + .collect(Collectors.toMap(Map.Entry::getKey, e -> convertViewConfigToStoreViewConfigRecord(e.getValue()))); + } + + static StoreViewConfigRecord convertViewConfigToStoreViewConfigRecord(ViewConfig viewConfig) { + return new StoreViewConfigRecord(viewConfig.getViewClassName(), viewConfig.dataModel().getViewParameters()); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index d5dca89826..a4b22b48fd 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -4016,7 +4016,7 @@ private void setPushStreamSourceAddress(String clusterName, String storeName, St private void addStoreViews(String clusterName, String storeName, Map viewConfigMap) { storeMetadataUpdate(clusterName, storeName, store -> { - store.setViewConfigs(StoreViewUtils.convertStringMapViewToViewConfig(viewConfigMap)); + store.setViewConfigs(StoreViewUtils.convertStringMapViewToViewConfigMap(viewConfigMap)); return store; }); } @@ -4672,8 +4672,39 @@ static Map mergeNewViewConfigsIntoOldConfigs( oldViewConfigMap = new HashMap<>(); } Map mergedConfigs = - StoreViewUtils.convertViewConfigToStoreViewConfig(oldViewConfigMap); - mergedConfigs.putAll(StoreViewUtils.convertStringMapViewToStoreViewConfigRecord(viewParameters)); + StoreViewUtils.convertViewConfigMapToStoreViewRecordMap(oldViewConfigMap); + mergedConfigs.putAll(StoreViewUtils.convertStringMapViewToStoreViewConfigRecordMap(viewParameters)); + return mergedConfigs; + } + + static Map addNewViewConfigsIntoOldConfigs( + Store oldStore, + String viewClass, + ViewConfig viewConfig) throws VeniceException { + // Add new view config into the existing config map. The new configs will override existing ones which share the + // same key. + Map oldViewConfigMap = oldStore.getViewConfigs(); + if (oldViewConfigMap == null) { + oldViewConfigMap = new HashMap<>(); + } + Map mergedConfigs = + StoreViewUtils.convertViewConfigMapToStoreViewRecordMap(oldViewConfigMap); + + StoreViewConfigRecord newStoreViewConfigRecord = + StoreViewUtils.convertViewConfigToStoreViewConfigRecord(viewConfig); + mergedConfigs.put(viewClass, newStoreViewConfigRecord); + return mergedConfigs; + } + + static Map removeViewConfigFromStoreViewConfigMap(Store oldStore, String viewClass) + throws VeniceException { + Map oldViewConfigMap = oldStore.getViewConfigs(); + if (oldViewConfigMap == null) { + oldViewConfigMap = new HashMap<>(); + } + Map mergedConfigs = + StoreViewUtils.convertViewConfigMapToStoreViewRecordMap(oldViewConfigMap); + mergedConfigs.remove(viewClass); return mergedConfigs; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 48d957abe7..3dfd532528 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -179,6 +179,7 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.meta.ViewConfig; +import com.linkedin.venice.meta.ViewConfigImpl; import com.linkedin.venice.persona.StoragePersona; import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -2150,6 +2151,10 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa Optional regionsFilter = params.getRegionsFilter(); Optional personaName = params.getStoragePersona(); Optional> storeViewConfig = params.getStoreViews(); + Optional viewName = params.getViewName(); + Optional viewClassName = params.getViewClassName(); + Optional> viewParams = params.getViewClassParams(); + Optional removeView = params.getDisableStoreView(); Optional latestSupersetSchemaId = params.getLatestSupersetSchemaId(); /** @@ -2203,12 +2208,30 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa pushStreamSourceAddress.map(addToUpdatedConfigList(updatedConfigsList, PUSH_STREAM_SOURCE_ADDRESS)) .orElseGet(currStore::getPushStreamSourceAddress); + if (storeViewConfig.isPresent() && viewName.isPresent()) { + throw new VeniceException("Cannot update a store view and overwrite store view setup together!"); + } + if (viewName.isPresent()) { + Map updatedViewSettings; + if (!removeView.isPresent()) { + if (!viewClassName.isPresent()) { + throw new VeniceException("View class name is required when configuring a view."); + } + // If View parameter is not provided, use emtpy map instead. It does not inherit from existing config. + ViewConfig viewConfig = new ViewConfigImpl(viewClassName.get(), viewParams.orElse(Collections.emptyMap())); + validateStoreViewConfig(currStore, viewConfig); + updatedViewSettings = VeniceHelixAdmin.addNewViewConfigsIntoOldConfigs(currStore, viewName.get(), viewConfig); + } else { + updatedViewSettings = VeniceHelixAdmin.removeViewConfigFromStoreViewConfigMap(currStore, viewName.get()); + } + setStore.views = updatedViewSettings; + updatedConfigsList.add(STORE_VIEW); + } + if (storeViewConfig.isPresent()) { - // Validate and merge store views if they're getting set - validateStoreViewConfig(storeViewConfig.get(), currStore); - Map mergedViewSettings = - VeniceHelixAdmin.mergeNewViewConfigsIntoOldConfigs(currStore, storeViewConfig.get()); - setStore.views = mergedViewSettings; + // Validate and overwrite store views if they're getting set + validateStoreViewConfigs(storeViewConfig.get(), currStore); + setStore.views = StoreViewUtils.convertStringMapViewToStoreViewConfigRecordMap(storeViewConfig.get()); updatedConfigsList.add(STORE_VIEW); } @@ -2585,19 +2608,20 @@ && getVeniceHelixAdmin().isHybrid(setStore.getHybridStoreConfig()) && setStore.g } } - private void validateStoreViewConfig(Map stringMap, Store store) { - Map configs = StoreViewUtils.convertStringMapViewToViewConfig(stringMap); - for (Map.Entry viewConfig: configs.entrySet()) { - // TODO: Pass a proper properties object here. Today this isn't used in this context - VeniceView view = ViewUtils.getVeniceView( - viewConfig.getValue().getViewClassName(), - new Properties(), - store, - viewConfig.getValue().getViewParameters()); - view.validateConfigs(); + private void validateStoreViewConfigs(Map stringMap, Store store) { + Map configs = StoreViewUtils.convertStringMapViewToViewConfigMap(stringMap); + for (Map.Entry viewConfigEntry: configs.entrySet()) { + validateStoreViewConfig(store, viewConfigEntry.getValue()); } } + private void validateStoreViewConfig(Store store, ViewConfig viewConfig) { + // TODO: Pass a proper properties object here. Today this isn't used in this context + VeniceView view = + ViewUtils.getVeniceView(viewConfig.getViewClassName(), new Properties(), store, viewConfig.getViewParameters()); + view.validateConfigs(); + } + private SupersetSchemaGenerator getSupersetSchemaGenerator(String clusterName) { if (externalSupersetSchemaGenerator.isPresent() && getMultiClusterConfigs().getControllerConfig(clusterName) .isParentExternalSupersetSchemaGenerationEnabled()) { diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index 127090a329..926b533744 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -64,6 +64,7 @@ import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; +import com.linkedin.venice.meta.ViewConfigImpl; import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.InvalidKeySchemaPartitioner; @@ -84,11 +85,13 @@ import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.views.ChangeCaptureView; import com.linkedin.venice.writer.VeniceWriter; import java.nio.ByteBuffer; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -1837,11 +1840,124 @@ public void testUpdateStoreNativeReplicationSourceFabric() { int schemaId = schemaCaptor.getValue(); AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId); UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; - Assert.assertTrue( - "dc1".equals(updateStore.nativeReplicationSourceFabric.toString()), + Assert.assertEquals( + updateStore.nativeReplicationSourceFabric.toString(), + "dc1", "Native replication source fabric does not match after updating the store!"); } + @Test + public void testSetStoreViewConfig() { + String storeName = Utils.getUniqueString("testUpdateStore"); + Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); + store.setActiveActiveReplicationEnabled(true); + store.setChunkingEnabled(true); + doReturn(store).when(internalAdmin).getStore(clusterName, storeName); + + doReturn(CompletableFuture.completedFuture(new SimplePubSubProduceResultImpl(topicName, partitionId, 1, -1))) + .when(veniceWriter) + .put(any(), any(), anyInt()); + + when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) + .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + + parentAdmin.initStorageCluster(clusterName); + Map viewConfig = new HashMap<>(); + viewConfig.put( + "changeCapture", + "{\"viewClassName\" : \"" + ChangeCaptureView.class.getCanonicalName() + "\", \"viewParameters\" : {}}"); + parentAdmin.updateStore(clusterName, storeName, new UpdateStoreQueryParams().setStoreViews(viewConfig)); + + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor valueCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Integer.class); + + verify(veniceWriter, times(1)).put(keyCaptor.capture(), valueCaptor.capture(), schemaCaptor.capture()); + byte[] valueBytes = valueCaptor.getValue(); + int schemaId = schemaCaptor.getValue(); + AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId); + UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; + Assert.assertTrue(updateStore.getViews().containsKey("changeCapture")); + } + + @Test + public void testInsertStoreViewConfig() { + String storeName = Utils.getUniqueString("testUpdateStore"); + Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); + store.setActiveActiveReplicationEnabled(true); + store.setChunkingEnabled(true); + store.setViewConfigs( + Collections.singletonMap("testView", new ViewConfigImpl("testViewClassDummyName", Collections.emptyMap()))); + doReturn(store).when(internalAdmin).getStore(clusterName, storeName); + + doReturn(CompletableFuture.completedFuture(new SimplePubSubProduceResultImpl(topicName, partitionId, 1, -1))) + .when(veniceWriter) + .put(any(), any(), anyInt()); + + when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) + .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + + parentAdmin.initStorageCluster(clusterName); + parentAdmin.updateStore( + clusterName, + storeName, + new UpdateStoreQueryParams().setViewName("changeCapture") + .setViewClassName(ChangeCaptureView.class.getCanonicalName())); + + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor valueCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Integer.class); + + verify(veniceWriter, times(1)).put(keyCaptor.capture(), valueCaptor.capture(), schemaCaptor.capture()); + byte[] valueBytes = valueCaptor.getValue(); + int schemaId = schemaCaptor.getValue(); + AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId); + UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; + Assert.assertEquals(updateStore.getViews().size(), 2); + Assert.assertTrue(updateStore.getViews().containsKey("changeCapture")); + Assert.assertEquals( + updateStore.getViews().get("changeCapture").viewClassName.toString(), + ChangeCaptureView.class.getCanonicalName()); + Assert.assertTrue(updateStore.getViews().get("changeCapture").viewParameters.isEmpty()); + } + + @Test + public void testRemoveStoreViewConfig() { + String storeName = Utils.getUniqueString("testUpdateStore"); + Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); + store.setActiveActiveReplicationEnabled(true); + store.setChunkingEnabled(true); + store.setViewConfigs( + Collections.singletonMap( + "changeCapture", + new ViewConfigImpl(ChangeCaptureView.class.getCanonicalName(), Collections.emptyMap()))); + doReturn(store).when(internalAdmin).getStore(clusterName, storeName); + + doReturn(CompletableFuture.completedFuture(new SimplePubSubProduceResultImpl(topicName, partitionId, 1, -1))) + .when(veniceWriter) + .put(any(), any(), anyInt()); + + when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) + .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + + parentAdmin.initStorageCluster(clusterName); + parentAdmin.updateStore( + clusterName, + storeName, + new UpdateStoreQueryParams().setViewName("changeCapture").setDisableStoreView()); + + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor valueCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Integer.class); + + verify(veniceWriter, times(1)).put(keyCaptor.capture(), valueCaptor.capture(), schemaCaptor.capture()); + byte[] valueBytes = valueCaptor.getValue(); + int schemaId = schemaCaptor.getValue(); + AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId); + UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; + Assert.assertEquals(updateStore.getViews().size(), 0); + } + @Test public void testUpdateStoreWithBadPartitionerConfigs() { String storeName = Utils.getUniqueString("testUpdateStore");