Skip to content

Commit

Permalink
[admin-tool][controller] Add new admin tool command to configure/remo…
Browse files Browse the repository at this point in the history
…ve particular view for store (#695)

This PR introduces a new AdminTool command to configure (add/update/remove) a view of a store.
The command name is --configure-store-view. It takes the following parameter:
--view-name: Required at all time. Value: Name of the view to act on.
--remove-view: If present, this action is to remove an existing view from the store config. No Value.
--view-class: Required when it is not removing a view. Value: View class name of the view.
--view-params: Optional. Value: Additional parameters for the view in the format of string to string map. If not specified, meaning it is going to use empty map, instead of inheriting existing value.

The actual logic is inside update store logic. The reason to separate it into a new command is to make it more user friendly to specify the intention, instead of putting everything into a Json map.
For the existing setView() API in UpdateStoreQueryParams, this PR changes the behavior from merging the input into existing view config to completely overwrite the old config, as it is overlapping with the functionality with the new command to some extent.
  • Loading branch information
sixpluszero authored Oct 24, 2023
1 parent 1ff609a commit c1a9ab1
Show file tree
Hide file tree
Showing 11 changed files with 358 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,9 @@ public static void main(String args[]) throws Exception {
case DUMP_INGESTION_STATE:
dumpIngestionState(cmd);
break;
case CONFIGURE_STORE_VIEW:
configureStoreView(cmd);
break;
default:
StringJoiner availableCommands = new StringJoiner(", ");
for (Command c: Command.values()) {
Expand Down Expand Up @@ -1013,7 +1016,6 @@ private static <TYPE> void genericParam(

private static void updateStore(CommandLine cmd) {
UpdateStoreQueryParams params = getUpdateStoreQueryParams(cmd);
params.setStoreViews(new HashMap<>());
String storeName = getRequiredArgument(cmd, Arg.STORE, Command.UPDATE_STORE);
ControllerResponse response = controllerClient.updateStore(storeName, params);
printSuccess(response);
Expand All @@ -1026,6 +1028,21 @@ private static void updateClusterConfig(CommandLine cmd) {
printSuccess(response);
}

static UpdateStoreQueryParams getConfigureStoreViewQueryParams(CommandLine cmd) {
Set<Arg> argSet = new HashSet<>(Arrays.asList(Command.CONFIGURE_STORE_VIEW.getOptionalArgs()));
argSet.addAll(new HashSet<>(Arrays.asList(Command.CONFIGURE_STORE_VIEW.getRequiredArgs())));
UpdateStoreQueryParams params = new UpdateStoreQueryParams();
params.setViewName(getRequiredArgument(cmd, Arg.VIEW_NAME));
if (cmd.hasOption(Arg.REMOVE_VIEW.toString())) {
params.setDisableStoreView();
} else {
// If configuring a view, view class name is required.
params.setViewClassName(getRequiredArgument(cmd, Arg.VIEW_CLASS));
}
stringMapParam(cmd, Arg.VIEW_PARAMS, p -> params.setViewClassParams(p), argSet);
return params;
}

static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) {
Set<Arg> argSet = new HashSet<>(Arrays.asList(Command.UPDATE_STORE.getOptionalArgs()));
argSet.addAll(new HashSet<>(Arrays.asList(Command.UPDATE_STORE.getRequiredArgs())));
Expand Down Expand Up @@ -2881,6 +2898,13 @@ private static void dumpIngestionState(CommandLine cmd) throws Exception {
}
}

private static void configureStoreView(CommandLine cmd) {
UpdateStoreQueryParams params = getConfigureStoreViewQueryParams(cmd);
String storeName = getRequiredArgument(cmd, Arg.STORE, Command.CONFIGURE_STORE_VIEW);
ControllerResponse response = controllerClient.updateStore(storeName, params);
printObject(response);
}

static void dumpIngestionState(TransportClient transportClient, String storeName, String version, String partition)
throws Exception {
StringBuilder sb = new StringBuilder(QueryAction.ADMIN.toString().toLowerCase()).append("/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ public enum Arg {
STORE_VIEW_CONFIGS(
"storage-view-configs", "svc", true,
"Config that describes views to be added for a store. Input is a json map. Example: {\"ExampleView\": {\"viewClassName\": \"com.linkedin.venice.views.ChangeCaptureView\",\"params\": {}}}"
),
), VIEW_NAME("view-name", "vn", true, "Name of a store view"),
VIEW_CLASS("view-class", "vc", true, "Name of a store view class"),
VIEW_PARAMS("view-params", "vp", true, "Additional parameter map of a store view class"),
REMOVE_VIEW("remove-view", "rv", false, "Optional config to specify to disable certain store view"),
PARTITION_DETAIL_ENABLED(
"partition-detail-enabled", "pde", true, "A flag to indicate whether to retrieve partition details"
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import static com.linkedin.venice.Arg.RECOVERY_COMMAND;
import static com.linkedin.venice.Arg.REGIONS_FILTER;
import static com.linkedin.venice.Arg.REGULAR_VERSION_ETL_ENABLED;
import static com.linkedin.venice.Arg.REMOVE_VIEW;
import static com.linkedin.venice.Arg.REPLICATE_ALL_CONFIGS;
import static com.linkedin.venice.Arg.REPLICATION_FACTOR;
import static com.linkedin.venice.Arg.RETRY;
Expand All @@ -107,6 +108,9 @@
import static com.linkedin.venice.Arg.VENICE_CLIENT_SSL_CONFIG_FILE;
import static com.linkedin.venice.Arg.VENICE_ZOOKEEPER_URL;
import static com.linkedin.venice.Arg.VERSION;
import static com.linkedin.venice.Arg.VIEW_CLASS;
import static com.linkedin.venice.Arg.VIEW_NAME;
import static com.linkedin.venice.Arg.VIEW_PARAMS;
import static com.linkedin.venice.Arg.VOLDEMORT_STORE;
import static com.linkedin.venice.Arg.VSON_STORE;
import static com.linkedin.venice.Arg.WRITEABILITY;
Expand Down Expand Up @@ -474,6 +478,10 @@ public enum Command {
"dump-ingestion-state",
"Dump the real-time ingestion state for a certain store version in a certain storage node",
new Arg[] { SERVER_URL, STORE, VERSION }, new Arg[] { PARTITION }
),
CONFIGURE_STORE_VIEW(
"configure-store-view", "Configure store view of a certain store", new Arg[] { URL, CLUSTER, STORE, VIEW_NAME },
new Arg[] { VIEW_CLASS, VIEW_PARAMS, REMOVE_VIEW }
);

private final String commandName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.views.ChangeCaptureView;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -284,4 +285,44 @@ public void testAdminToolDumpIngestionState() throws Exception {
doReturn(completableFuture).when(transportClient).get(anyString());
AdminTool.dumpIngestionState(transportClient, storeName, version, null);
}

@Test
public void testAdminConfigureView() throws ParseException, IOException {
final String K1 = "kafka.linger.ms", V1 = "0", K2 = "dummy.key", V2 = "dummy.val";

// Case 1: Happy path to setup a view.
String[] args = { "--configure-store-view", "--url", "http://localhost:7036", "--cluster", "test-cluster",
"--store", "testStore", "--view-name", "testView", "--view-class", ChangeCaptureView.class.getCanonicalName(),
"--view-params", K1 + "=" + V1 + "," + K2 + "=" + V2 };

CommandLine commandLine = AdminTool.getCommandLine(args);
UpdateStoreQueryParams params = AdminTool.getConfigureStoreViewQueryParams(commandLine);
Assert.assertTrue(params.getViewName().isPresent());
Assert.assertEquals(params.getViewName().get(), "testView");
Assert.assertTrue(params.getViewClassName().isPresent());
Assert.assertEquals(params.getViewClassName().get(), ChangeCaptureView.class.getCanonicalName());

Optional<Map<String, String>> viewParams = params.getViewClassParams();
Assert.assertTrue(viewParams.isPresent());
Map<String, String> viewParamsMap = viewParams.get();
Assert.assertEquals(viewParamsMap.get(K1), V1);
Assert.assertEquals(viewParamsMap.get(K2), V2);

// Case 2: Happy path to disable a view.
String[] args1 = { "--configure-store-view", "--url", "http://localhost:7036", "--cluster", "test-cluster",
"--store", "testStore", "--view-name", "testView", "--remove-view" };
commandLine = AdminTool.getCommandLine(args1);
params = AdminTool.getConfigureStoreViewQueryParams(commandLine);
Assert.assertTrue(params.getViewName().isPresent());
Assert.assertTrue(params.getDisableStoreView().isPresent());
Assert.assertEquals(params.getViewName().get(), "testView");
Assert.assertFalse(params.getViewClassName().isPresent());

// Case 3: Configure view with missing viewName;
String[] args2 = { "--configure-store-view", "--url", "http://localhost:7036", "--cluster", "test-cluster",
"--store", "testStore", "--view-name", "testView" };
commandLine = AdminTool.getCommandLine(args2);
CommandLine finalCommandLine = commandLine;
Assert.assertThrows(() -> AdminTool.getConfigureStoreViewQueryParams(finalCommandLine));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public class ControllerApiConstants {
public static final String INCLUDE_SYSTEM_STORES = "include_system_stores";

public static final String STORE_VIEW = "store_view";
public static final String STORE_VIEW_NAME = "store_view_name";
public static final String STORE_VIEW_CLASS = "store_view_class";
public static final String STORE_VIEW_PARAMS = "store_view_params";
public static final String DISABLE_STORE_VIEW = "disable_store_view";

public static final String NATIVE_REPLICATION_ENABLED = "native_replication_enabled";
public static final String PUSH_STREAM_SOURCE_ADDRESS = "push_stream_source_address";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.DATA_REPLICATION_POLICY;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.DISABLE_DAVINCI_PUSH_STATUS_STORE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.DISABLE_META_STORE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.DISABLE_STORE_VIEW;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.ENABLE_READS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.ENABLE_WRITES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.ETLED_PROXY_USER_ACCOUNT;
Expand Down Expand Up @@ -49,6 +50,9 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_QUOTA_IN_BYTE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_MIGRATION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_CLASS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_NAME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VIEW_PARAMS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TIME_LAG_TO_GO_ONLINE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPDATED_CONFIGS_LIST;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION;
Expand Down Expand Up @@ -618,6 +622,38 @@ public Optional<Long> getMinCompactionLagSeconds() {
return getLong(MIN_COMPACTION_LAG_SECONDS);
}

public Optional<String> getViewName() {
return getString(STORE_VIEW_NAME);
}

public UpdateStoreQueryParams setViewName(String viewName) {
return (UpdateStoreQueryParams) add(STORE_VIEW_NAME, viewName);
}

public Optional<String> getViewClassName() {
return getString(STORE_VIEW_CLASS);
}

public UpdateStoreQueryParams setViewClassName(String viewClassName) {
return (UpdateStoreQueryParams) add(STORE_VIEW_CLASS, viewClassName);
}

public Optional<Map<String, String>> getViewClassParams() {
return getStringMap(STORE_VIEW_PARAMS);
}

public UpdateStoreQueryParams setViewClassParams(Map<String, String> partitionerParams) {
return (UpdateStoreQueryParams) putStringMap(STORE_VIEW_PARAMS, partitionerParams);
}

public Optional<Boolean> getDisableStoreView() {
return getBoolean(DISABLE_STORE_VIEW);
}

public UpdateStoreQueryParams setDisableStoreView() {
return (UpdateStoreQueryParams) add(DISABLE_STORE_VIEW, true);
}

// ***************** above this line are getters and setters *****************
private UpdateStoreQueryParams putInteger(String name, int value) {
return (UpdateStoreQueryParams) add(name, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.meta.VeniceUserStoreType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
Expand Down Expand Up @@ -545,13 +546,8 @@ public void testAAIngestionWithStoreView() throws Exception {
Map<String, String> viewConfig = new HashMap<>();
props.put(KAFKA_LINGER_MS, 0);
viewConfig.put(
"testView",
"testViewWrong",
"{\"viewClassName\" : \"" + TestView.class.getCanonicalName() + "\", \"viewParameters\" : {}}");

viewConfig.put(
"changeCaptureView",
"{\"viewClassName\" : \"" + ChangeCaptureView.class.getCanonicalName()
+ "\", \"viewParameters\" : {\"kafka.linger.ms\": \"0\"}}");
UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true)
.setHybridRewindSeconds(500)
.setHybridOffsetLagThreshold(8)
Expand All @@ -561,11 +557,39 @@ public void testAAIngestionWithStoreView() throws Exception {
MetricsRepository metricsRepository = new MetricsRepository();
ControllerClient setupControllerClient =
createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms);
storeParms.setStoreViews(viewConfig);
// IntegrationTestPushUtils.updateStore(clusterName, props, storeParms);
UpdateStoreQueryParams storeParams1 = new UpdateStoreQueryParams().setStoreViews(viewConfig);
setupControllerClient
.retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams1));
UpdateStoreQueryParams storeParams2 =
new UpdateStoreQueryParams().setViewName("testViewWrong").setDisableStoreView();
setupControllerClient
.retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams2));
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> {
Map<String, ViewConfig> viewConfigMap = setupControllerClient.getStore(storeName).getStore().getViewConfigs();
Assert.assertTrue(viewConfigMap.isEmpty());
});

UpdateStoreQueryParams storeParams3 = new UpdateStoreQueryParams().setViewName("changeCaptureView")
.setViewClassName(ChangeCaptureView.class.getCanonicalName())
.setViewClassParams(Collections.singletonMap("kafka.linger.ms", "0"));
setupControllerClient
.retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParms));
// controllerClient.updateStore(storeName, storeParms);
.retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams3));

UpdateStoreQueryParams storeParams4 =
new UpdateStoreQueryParams().setViewName("testView").setViewClassName(TestView.class.getCanonicalName());
setupControllerClient
.retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams4));

TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> {
Map<String, ViewConfig> viewConfigMap = setupControllerClient.getStore(storeName).getStore().getViewConfigs();
Assert.assertEquals(viewConfigMap.size(), 2);
Assert.assertEquals(viewConfigMap.get("testView").getViewClassName(), TestView.class.getCanonicalName());
Assert.assertEquals(
viewConfigMap.get("changeCaptureView").getViewClassName(),
ChangeCaptureView.class.getCanonicalName());
Assert.assertEquals(viewConfigMap.get("changeCaptureView").getViewParameters().size(), 1);
});

TestWriteUtils.runPushJob("Run push job", props);
Map<String, String> samzaConfig = getSamzaConfig(storeName);
VeniceSystemFactory factory = new VeniceSystemFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public class StoreViewUtils {
private static final VeniceJsonSerializer<ViewConfig> viewConfigVeniceJsonSerializer =
new VeniceJsonSerializer<>(ViewConfig.class);

static Map<String, StoreViewConfigRecord> convertStringMapViewToStoreViewConfigRecord(Map<String, String> stringMap)
throws VeniceException {
static Map<String, StoreViewConfigRecord> convertStringMapViewToStoreViewConfigRecordMap(
Map<String, String> stringMap) throws VeniceException {
Map<String, StoreViewConfigRecord> mergedViewConfigRecords = new HashMap<>();
if (!stringMap.isEmpty()) {
for (Map.Entry<String, String> stringViewConfig: stringMap.entrySet()) {
Expand All @@ -41,40 +41,41 @@ static Map<String, StoreViewConfigRecord> convertStringMapViewToStoreViewConfigR
return mergedViewConfigRecords;
}

static Map<String, StoreViewConfig> convertStringMapViewToStoreViewConfig(Map<String, String> stringMap) {
static Map<String, StoreViewConfig> convertStringMapViewToStoreViewConfigMap(Map<String, String> stringMap) {
Map<String, StoreViewConfig> mergedViewConfigRecords = new HashMap<>();
if (!stringMap.isEmpty()) {
for (Map.Entry<String, String> stringViewConfig: stringMap.entrySet()) {
StoreViewConfig newViewConfig;
try {
ViewConfig viewConfig =
viewConfigVeniceJsonSerializer.deserialize(stringViewConfig.getValue().getBytes(), "");
StoreViewConfig newViewConfig = new StoreViewConfig(
newViewConfig = new StoreViewConfig(
viewConfig.getViewClassName(),
CollectionUtils.getStringKeyCharSequenceValueMapFromStringMap(viewConfig.getViewParameters()));
mergedViewConfigRecords.put(stringViewConfig.getKey(), newViewConfig);
} catch (IOException e) {
LOGGER.error("Failed to serialize provided view config: {}", stringViewConfig.getValue());
throw new VeniceException("Failed to serialize provided view config:" + stringViewConfig.getValue(), e);
}
mergedViewConfigRecords.put(stringViewConfig.getKey(), newViewConfig);
}
}
return mergedViewConfigRecords;
}

static Map<String, ViewConfig> convertStringMapViewToViewConfig(Map<String, String> stringMap) {
return convertStringMapViewToStoreViewConfig(stringMap).entrySet()
static Map<String, ViewConfig> convertStringMapViewToViewConfigMap(Map<String, String> stringMap) {
return convertStringMapViewToStoreViewConfigMap(stringMap).entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new ViewConfigImpl(e.getValue())));
}

static Map<String, StoreViewConfigRecord> convertViewConfigToStoreViewConfig(Map<String, ViewConfig> viewConfigMap) {
static Map<String, StoreViewConfigRecord> convertViewConfigMapToStoreViewRecordMap(
Map<String, ViewConfig> viewConfigMap) {
return viewConfigMap.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> new StoreViewConfigRecord(
e.getValue().getViewClassName(),
e.getValue().dataModel().getViewParameters())));
.collect(Collectors.toMap(Map.Entry::getKey, e -> convertViewConfigToStoreViewConfigRecord(e.getValue())));
}

static StoreViewConfigRecord convertViewConfigToStoreViewConfigRecord(ViewConfig viewConfig) {
return new StoreViewConfigRecord(viewConfig.getViewClassName(), viewConfig.dataModel().getViewParameters());
}
}
Loading

0 comments on commit c1a9ab1

Please sign in to comment.