diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index 256113d46f..f021fc61fb 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -584,9 +584,6 @@ public static void main(String[] args) throws Exception { case DUMP_HOST_HEARTBEAT: dumpHostHeartbeat(cmd); break; - case CREATE_REAL_TIME_TOPIC: - createRealTimeTopic(cmd); - break; default: StringJoiner availableCommands = new StringJoiner(", "); for (Command c: Command.values()) { @@ -1179,7 +1176,6 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) { genericParam(cmd, Arg.TARGET_SWAP_REGION, s -> s, p -> params.setTargetRegionSwap(p), argSet); integerParam(cmd, Arg.TARGET_SWAP_REGION_WAIT_TIME, p -> params.setTargetRegionSwapWaitTime(p), argSet); booleanParam(cmd, Arg.DAVINCI_HEARTBEAT_REPORTED, p -> params.setIsDavinciHeartbeatReported(p), argSet); - genericParam(cmd, Arg.SET_REAL_TIME_TOPIC, s -> s, params::setRealTimeTopicName, argSet); /** * {@link Arg#REPLICATE_ALL_CONFIGS} doesn't require parameters; once specified, it means true. @@ -3399,11 +3395,4 @@ private static PubSubConsumerAdapter getConsumer( return pubSubClientsFactory.getConsumerAdapterFactory() .create(new VeniceProperties(consumerProps), false, pubSubMessageDeserializer, "admin-tool-topic-dumper"); } - - private static void createRealTimeTopic(CommandLine cmd) { - String storeName = getRequiredArgument(cmd, Arg.STORE, Command.CREATE_REAL_TIME_TOPIC); - String partitionNum = getOptionalArgument(cmd, Arg.PARTITION_COUNT); - PartitionResponse response = controllerClient.createRealTimeTopic(storeName, partitionNum); - printSuccess(response); - } } diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java index b223333e0d..75b2115a49 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java @@ -294,8 +294,7 @@ public enum Arg { ), DAVINCI_HEARTBEAT_REPORTED( "dvc-heartbeat-reported", "dvchb", true, "Flag to indicate whether DVC is bootstrapping and sending heartbeats" - ), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config"), - SET_REAL_TIME_TOPIC("SET-real-time-topic", "rttn", true, "Set a new real time topic"); + ), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config"); private final String argName; private final String first; diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index 8dccd8b856..51828eb3ae 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -108,7 +108,6 @@ import static com.linkedin.venice.Arg.SEPARATE_REALTIME_TOPIC_ENABLED; import static com.linkedin.venice.Arg.SERVER_KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND; import static com.linkedin.venice.Arg.SERVER_URL; -import static com.linkedin.venice.Arg.SET_REAL_TIME_TOPIC; import static com.linkedin.venice.Arg.SKIP_DIV; import static com.linkedin.venice.Arg.SKIP_LAST_STORE_CREATION; import static com.linkedin.venice.Arg.SOURCE_FABRIC; @@ -280,7 +279,7 @@ public enum Command { MAX_COMPACTION_LAG_SECONDS, MAX_RECORD_SIZE_BYTES, MAX_NEARLINE_RECORD_SIZE_BYTES, UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED, SEPARATE_REALTIME_TOPIC_ENABLED, NEARLINE_PRODUCER_COMPRESSION_ENABLED, NEARLINE_PRODUCER_COUNT_PER_WRITER, TARGET_SWAP_REGION, - TARGET_SWAP_REGION_WAIT_TIME, DAVINCI_HEARTBEAT_REPORTED, ENABLE_STORE_MIGRATION, SET_REAL_TIME_TOPIC } + TARGET_SWAP_REGION_WAIT_TIME, DAVINCI_HEARTBEAT_REPORTED, ENABLE_STORE_MIGRATION } ), UPDATE_CLUSTER_CONFIG( "update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER }, @@ -563,10 +562,6 @@ public enum Command { "dump-host-heartbeat", "Dump all heartbeat belong to a certain storage node. You can use topic/partition to filter specific resource, and you can choose to filter resources that are lagging.", new Arg[] { SERVER_URL, KAFKA_TOPIC_NAME }, new Arg[] { PARTITION, LAG_FILTER_ENABLED } - ), - CREATE_REAL_TIME_TOPIC( - "create-real-time-topic", "Create a real time topic for an existing store", new Arg[] { URL, STORE }, - new Arg[] { CLUSTER, PARTITION_COUNT } ); private final String commandName; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java index f1f5a73e79..61ecaba454 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java @@ -936,11 +936,6 @@ public StoppableNodeStatusResponse getAggregatedHealthStatus( requestString.getBytes()); } - public PartitionResponse createRealTimeTopic(String storeName, String partitionNum) { - QueryParams params = newParams().add(NAME, storeName).add(PARTITION_COUNT, partitionNum); - return request(ControllerRoute.CREATE_REAL_TIME_TOPIC, params, PartitionResponse.class); - } - public MultiNodesStatusResponse listInstancesStatuses(boolean enableReplicas) { QueryParams params = newParams(); if (enableReplicas) { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java index 0e44ef359c..8ddc43c8d5 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java @@ -305,8 +305,7 @@ public enum ControllerRoute { DELETE_UNUSED_VALUE_SCHEMAS( "/delete_unused_value_schemas", HttpMethod.POST, Arrays.asList(CLUSTER, NAME), ControllerApiConstants.VALUE_SCHEMA_IDS - ), GET_INUSE_SCHEMA_IDS("/get_inuse_schema_ids", HttpMethod.GET, Arrays.asList(CLUSTER, NAME)), - CREATE_REAL_TIME_TOPIC("/create_real_time_topic", HttpMethod.POST, Arrays.asList(NAME, PARTITION_COUNT)); + ), GET_INUSE_SCHEMA_IDS("/get_inuse_schema_ids", HttpMethod.GET, Arrays.asList(CLUSTER, NAME)); private final String path; private final HttpMethod httpMethod; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java index 9df393d23a..9256626b39 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java @@ -39,7 +39,6 @@ import com.linkedin.venice.integration.utils.VeniceControllerWrapper; import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; import com.linkedin.venice.integration.utils.VeniceServerWrapper; -import com.linkedin.venice.meta.BackupStrategy; import com.linkedin.venice.meta.HybridStoreConfig; import com.linkedin.venice.meta.InstanceStatus; import com.linkedin.venice.meta.Store; @@ -1121,66 +1120,6 @@ public void testCleanupInstanceCustomizedStates() { } } - @Test(timeOut = TEST_TIMEOUT) - public void testCreateRealTimeTopicCall() throws IOException, ExecutionException, InterruptedException { - String clusterName = venice.getClusterNames()[0]; - String storeName = Utils.getUniqueString("testCreateRealTimeTopicCall"); - CloseableHttpAsyncClient httpClient = HttpClientUtils.getMinimalHttpClient(1, 1, Optional.empty()); - httpClient.start(); - - VeniceHelixAdmin childControllerAdmin = venice.getChildRegions().get(0).getRandomController().getVeniceHelixAdmin(); - childControllerAdmin.createStore(clusterName, storeName, "test", "\"string\"", "\"string\""); - - UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams(); - updateStoreParams.setIncrementalPushEnabled(true) - .setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS) - .setNumVersionsToPreserve(3) - .setHybridRewindSeconds(1000) - .setHybridOffsetLagThreshold(1000); - - childControllerAdmin.updateStore(clusterName, storeName, updateStoreParams); - childControllerAdmin.incrementVersionIdempotent(clusterName, storeName, "1", 1, 1); - - // API call with all fields - List params = new ArrayList<>(); - params.add(new BasicNameValuePair(ControllerApiConstants.CLUSTER, clusterName)); - params.add(new BasicNameValuePair(ControllerApiConstants.NAME, storeName)); - params.add(new BasicNameValuePair(ControllerApiConstants.PARTITION_COUNT, "8")); - - makeRealTimeTopicCall(httpClient, params); - - childControllerAdmin.incrementVersionIdempotent(clusterName, storeName, "2", 1, 1); - this.controllerClient - .sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND); - - List versions = childControllerAdmin.getStore(clusterName, storeName).getVersions(); - Assert.assertEquals(versions.size(), 2); - - String oldRealTimeTopicName = Utils.getRealTimeTopicName(versions.get(0)); - String newRealTimeTopicName = Utils.getRealTimeTopicName(versions.get(1)); - - Assert.assertNotEquals(oldRealTimeTopicName, newRealTimeTopicName); - Assert.assertTrue( - childControllerAdmin.getTopicManager() - .containsTopic(childControllerAdmin.getPubSubTopicRepository().getTopic(newRealTimeTopicName))); - - httpClient.close(); - } - - private void makeRealTimeTopicCall(HttpAsyncClient httpClient, List params) - throws IOException, ExecutionException, InterruptedException { - // StringEntity entity = new StringEntity(OBJECT_MAPPER.writeValueAsString(payloads), ContentType.APPLICATION_JSON); - - final HttpPost post = new HttpPost( - venice.getChildRegions().get(0).getControllerConnectString() - + ControllerRoute.CREATE_REAL_TIME_TOPIC.getPath()); - post.setEntity(new UrlEncodedFormEntity(params)); - HttpResponse httpResponse = httpClient.execute(post, null).get(); - - Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), 200); - String responseString = IOUtils.toString(httpResponse.getEntity().getContent()); - } - private void deleteStore(String storeName) { parentControllerClient.enableStoreReadWrites(storeName, false); parentControllerClient.deleteStore(storeName); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index a42f35828a..244c7006f6 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -4277,7 +4277,7 @@ public void setStorePartitionCount( // is read only in getRealTimeTopic and createInternalStore creation, so modifying currentVersion should not have // any effect. if (partitionCount != store.getPartitionCount()) { - updateRealTimeTopic(store, clusterName, partitionCount); + updateRealTimeTopicName(store); } if (partitionCount != 0) { store.setPartitionCount(partitionCount); @@ -4341,39 +4341,19 @@ void preCheckStorePartitionCountUpdate( } } - private void updateRealTimeTopic(Store store, String clusterName, int partitionCount) { + private void updateRealTimeTopicName(Store store) { String storeName = store.getName(); String oldRealTimeTopicName = Utils.getRealTimeTopicName(store); - String newRealTimeTopicName = Utils.createNewRealTimeTopicName(oldRealTimeTopicName); - PubSubTopic newRealTimeTopic = getPubSubTopicRepository().getTopic(newRealTimeTopicName); + String newRealTimeTopicName; + PubSubTopic newRealTimeTopic; - HelixVeniceClusterResources resources = getHelixVeniceClusterResources(storeName); - try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) { - // The topic might be created by another thread already. Check before creating. - if (getTopicManager().containsTopic(newRealTimeTopic)) { - return; - } - ReadWriteStoreRepository repository = resources.getStoreMetadataRepository(); - if (!store.isHybrid() && !store.isWriteComputationEnabled() && !store.isSystemStore()) { - logAndThrow("Store " + storeName + " is not hybrid, refusing to return a realtime topic"); - } + do { + newRealTimeTopicName = Utils.createNewRealTimeTopicName(oldRealTimeTopicName); + newRealTimeTopic = getPubSubTopicRepository().getTopic(newRealTimeTopicName); + oldRealTimeTopicName = newRealTimeTopicName; + } while (getTopicManager().containsTopic(newRealTimeTopic)); - VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig(); - getTopicManager().createTopic( - newRealTimeTopic, - partitionCount, - clusterConfig.getKafkaReplicationFactorRTTopics(), - store.getRetentionTime(), - false, - clusterConfig.getMinInSyncReplicasRealTimeTopics(), - false); - LOGGER.warn( - "Creating real time topic per topic request for store: {}. " - + "Buffer replay won't start for any existing versions", - storeName); - store.getHybridStoreConfig().setRealTimeTopicName(newRealTimeTopicName); - repository.updateStore(store); - } + store.getHybridStoreConfig().setRealTimeTopicName(newRealTimeTopicName); } void setStorePartitionerConfig(String clusterName, String storeName, PartitionerConfig partitionerConfig) { @@ -4853,8 +4833,8 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto Optional hybridTimeLagThreshold = params.getHybridTimeLagThreshold(); Optional hybridDataReplicationPolicy = params.getHybridDataReplicationPolicy(); Optional hybridBufferReplayPolicy = params.getHybridBufferReplayPolicy(); - Optional realTimeTopicName = Optional.empty(); // to update real time topic name, there is a separate admin - // tool command + Optional realTimeTopicName = Optional.empty(); // real time topic name should only be changed during + // partition count update Optional accessControlled = params.getAccessControlled(); Optional compressionStrategy = params.getCompressionStrategy(); Optional clientDecompressionEnabled = params.getClientDecompressionEnabled(); @@ -6503,7 +6483,7 @@ private void throwStoreAlreadyExists(String clusterName, String storeName) { throw new VeniceStoreAlreadyExistsException(storeName, clusterName); } - public static void throwStoreDoesNotExist(String clusterName, String storeName) { + private static void throwStoreDoesNotExist(String clusterName, String storeName) { String errorMessage = "Store:" + storeName + " does not exist in cluster:" + clusterName; LOGGER.error(errorMessage); throw new VeniceNoStoreException(storeName, clusterName); @@ -6518,7 +6498,7 @@ private void throwClusterNotInitialized(String clusterName) { throw new VeniceNoClusterException(clusterName); } - public static void logAndThrow(String msg) { + private void logAndThrow(String msg) { LOGGER.info(msg); throw new VeniceException(msg); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java index 6500255369..c77d7fafab 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java @@ -16,7 +16,6 @@ import static com.linkedin.venice.controllerapi.ControllerRoute.COMPARE_STORE; import static com.linkedin.venice.controllerapi.ControllerRoute.COMPLETE_MIGRATION; import static com.linkedin.venice.controllerapi.ControllerRoute.CONFIGURE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER; -import static com.linkedin.venice.controllerapi.ControllerRoute.CREATE_REAL_TIME_TOPIC; import static com.linkedin.venice.controllerapi.ControllerRoute.CREATE_STORAGE_PERSONA; import static com.linkedin.venice.controllerapi.ControllerRoute.ClUSTER_HEALTH_INSTANCES; import static com.linkedin.venice.controllerapi.ControllerRoute.DATA_RECOVERY; @@ -651,10 +650,6 @@ public boolean startInner() throws Exception { CLEANUP_INSTANCE_CUSTOMIZED_STATES.getPath(), new VeniceParentControllerRegionStateHandler(admin, clusterRoutes.cleanupInstanceCustomizedStates(admin))); - httpService.post( - CREATE_REAL_TIME_TOPIC.getPath(), - new VeniceParentControllerRegionStateHandler(admin, storesRoutes.createRealTimeTopic(admin))); - httpService.awaitInitialization(); // Wait for server to be initialized Exception e = initFailure.get(); if (e != null) { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java index 06f5b2fb22..9b45e1adec 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java @@ -1,7 +1,5 @@ package com.linkedin.venice.controller.server; -import static com.linkedin.venice.controller.VeniceHelixAdmin.logAndThrow; -import static com.linkedin.venice.controller.VeniceHelixAdmin.throwStoreDoesNotExist; import static com.linkedin.venice.controller.server.VeniceRouteHandler.ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX; import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER_DEST; @@ -13,7 +11,6 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME; import static com.linkedin.venice.controllerapi.ControllerApiConstants.OPERATION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.OWNER; -import static com.linkedin.venice.controllerapi.ControllerApiConstants.PARTITION_COUNT; import static com.linkedin.venice.controllerapi.ControllerApiConstants.PARTITION_DETAIL_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_OPERATION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_WRITE_OPERATION; @@ -32,7 +29,6 @@ import static com.linkedin.venice.controllerapi.ControllerRoute.COMPARE_STORE; import static com.linkedin.venice.controllerapi.ControllerRoute.COMPLETE_MIGRATION; import static com.linkedin.venice.controllerapi.ControllerRoute.CONFIGURE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER; -import static com.linkedin.venice.controllerapi.ControllerRoute.CREATE_REAL_TIME_TOPIC; import static com.linkedin.venice.controllerapi.ControllerRoute.DELETE_ALL_VERSIONS; import static com.linkedin.venice.controllerapi.ControllerRoute.DELETE_KAFKA_TOPIC; import static com.linkedin.venice.controllerapi.ControllerRoute.DELETE_STORE; @@ -63,8 +59,6 @@ import com.linkedin.venice.acl.DynamicAccessController; import com.linkedin.venice.controller.Admin; import com.linkedin.venice.controller.AdminCommandExecutionTracker; -import com.linkedin.venice.controller.HelixVeniceClusterResources; -import com.linkedin.venice.controller.VeniceControllerClusterConfig; import com.linkedin.venice.controller.kafka.TopicCleanupService; import com.linkedin.venice.controllerapi.ClusterStaleDataAuditResponse; import com.linkedin.venice.controllerapi.ControllerResponse; @@ -93,7 +87,6 @@ import com.linkedin.venice.exceptions.ResourceStillExistsException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceNoStoreException; -import com.linkedin.venice.meta.ReadWriteStoreRepository; import com.linkedin.venice.meta.RegionPushDetails; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreDataAudit; @@ -107,7 +100,6 @@ import com.linkedin.venice.pubsub.manager.TopicManager; import com.linkedin.venice.systemstore.schemas.StoreProperties; import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.locks.AutoCloseableLock; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -1105,65 +1097,4 @@ public Route getHeartbeatFromSystemStore(Admin admin) { return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject); }; } - - public Route createRealTimeTopic(Admin admin) { - return new VeniceRouteHandler<>(StoreResponse.class) { - @Override - public void internalHandle(Request request, StoreResponse veniceResponse) { - if (!isAllowListUser(request)) { - veniceResponse.setError("Access Denied!! Only admins can change topic compaction policy!"); - veniceResponse.setErrorType(ErrorType.BAD_REQUEST); - return; - } - AdminSparkServer.validateParams(request, CREATE_REAL_TIME_TOPIC.getParams(), admin); - try { - String clusterName = request.queryParams(CLUSTER); - String storeName = request.queryParams(NAME); - int partitionCount = Integer.parseInt(request.queryParams(PARTITION_COUNT)); - String oldRealTimeTopicName = admin.getRealTimeTopic(clusterName, storeName); - String newRealTimeTopicName = Utils.createNewRealTimeTopicName(oldRealTimeTopicName); - PubSubTopic newRealTimeTopic = admin.getPubSubTopicRepository().getTopic(newRealTimeTopicName); - - HelixVeniceClusterResources resources = admin.getHelixVeniceClusterResources(clusterName); - try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) { - // The topic might be created by another thread already. Check before creating. - if (admin.getTopicManager().containsTopic(newRealTimeTopic)) { - return; - } - ReadWriteStoreRepository repository = resources.getStoreMetadataRepository(); - Store store = repository.getStore(storeName); - if (store == null) { - throwStoreDoesNotExist(clusterName, storeName); - } - if (!store.isHybrid() && !store.isWriteComputationEnabled() && !store.isSystemStore()) { - logAndThrow("Store " + storeName + " is not hybrid, refusing to return a realtime topic"); - } - - VeniceControllerClusterConfig clusterConfig = admin.getHelixVeniceClusterResources(clusterName).getConfig(); - admin.getTopicManager() - .createTopic( - newRealTimeTopic, - partitionCount, - clusterConfig.getKafkaReplicationFactorRTTopics(), - store.getRetentionTime(), - false, - // Note: do not enable RT compaction! Might make jobs in Online/Offline model stuck - clusterConfig.getMinInSyncReplicasRealTimeTopics(), - false); - // TODO: if there is an online version from a batch push before this store was hybrid then we won't start - // replicating to it. A new version must be created. - LOGGER.warn( - "Creating real time topic per topic request for store: {}. " - + "Buffer replay won't start for any existing versions", - storeName); - store.getHybridStoreConfig().setRealTimeTopicName(newRealTimeTopicName); - repository.updateStore(store); - } - - } catch (PubSubTopicDoesNotExistException e) { - veniceResponse.setError("Topic does not exist!! Message: " + e.getMessage()); - } - } - }; - } } diff --git a/services/venice-controller/src/main/resources/avro/AdminOperation/v85/AdminOperation.avsc b/services/venice-controller/src/main/resources/avro/AdminOperation/v85/AdminOperation.avsc new file mode 100644 index 0000000000..60edadd30e --- /dev/null +++ b/services/venice-controller/src/main/resources/avro/AdminOperation/v85/AdminOperation.avsc @@ -0,0 +1,1171 @@ +{ + "name": "AdminOperation", + "namespace": "com.linkedin.venice.controller.kafka.protocol.admin", + "type": "record", + "fields": [ + { + "name": "operationType", + "doc": "0 => StoreCreation, 1 => ValueSchemaCreation, 2 => PauseStore, 3 => ResumeStore, 4 => KillOfflinePushJob, 5 => DisableStoreRead, 6 => EnableStoreRead, 7=> DeleteAllVersions, 8=> SetStoreOwner, 9=> SetStorePartitionCount, 10=> SetStoreCurrentVersion, 11=> UpdateStore, 12=> DeleteStore, 13=> DeleteOldVersion, 14=> MigrateStore, 15=> AbortMigration, 16=>AddVersion, 17=> DerivedSchemaCreation, 18=>SupersetSchemaCreation, 19=>EnableNativeReplicationForCluster, 20=>MetadataSchemaCreation, 21=>EnableActiveActiveReplicationForCluster, 25=>CreatePersona, 26=>DeletePersona, 27=>UpdatePersona, 28=>RollbackCurrentVersion, 29=>RollforwardCurrentVersion", + "type": "int" + }, { + "name": "executionId", + "doc": "ID of a command execution which is used to query the status of this command.", + "type": "long", + "default": 0 + }, { + "name": "payloadUnion", + "doc": "This contains the main payload of the admin operation", + "type": [ + { + "name": "StoreCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "owner", + "type": "string" + }, + { + "name": "keySchema", + "type": { + "type": "record", + "name": "SchemaMeta", + "fields": [ + {"name": "schemaType", "type": "int", "doc": "0 => Avro-1.4, and we can add more if necessary"}, + {"name": "definition", "type": "string"} + ] + } + }, + { + "name": "valueSchema", + "type": "SchemaMeta" + } + ] + }, + { + "name": "ValueSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "schema", + "type": "SchemaMeta" + }, + { + "name": "schemaId", + "type": "int" + }, + { + "name": "doUpdateSupersetSchemaID", + "type": "boolean", + "doc": "Whether this superset schema ID should be updated to be the value schema ID for this store.", + "default": false + } + ] + }, + { + "name": "PauseStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "ResumeStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "KillOfflinePushJob", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "kafkaTopic", + "type": "string" + } + ] + }, + { + "name": "DisableStoreRead", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "EnableStoreRead", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "DeleteAllVersions", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "SetStoreOwner", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "owner", + "type": "string" + } + ] + }, + { + "name": "SetStorePartitionCount", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "partitionNum", + "type": "int" + } + ] + }, + { + "name": "SetStoreCurrentVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "currentVersion", + "type": "int" + } + ] + }, + { + "name": "UpdateStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "owner", + "type": "string" + }, + { + "name": "partitionNum", + "type": "int" + }, + { + "name": "currentVersion", + "type": "int" + }, + { + "name": "enableReads", + "type": "boolean" + }, + { + "name": "enableWrites", + "type": "boolean" + }, + { + "name": "storageQuotaInByte", + "type": "long", + "default": 21474836480 + }, + { + "name": "readQuotaInCU", + "type": "long", + "default": 1800 + }, + { + "name": "hybridStoreConfig", + "type": [ + "null", + { + "name": "HybridStoreConfigRecord", + "type": "record", + "fields": [ + { + "name": "rewindTimeInSeconds", + "type": "long" + }, + { + "name": "offsetLagThresholdToGoOnline", + "type": "long" + }, + { + "name": "producerTimestampLagThresholdToGoOnlineInSeconds", + "type": "long", + "default": -1 + }, + { + "name": "dataReplicationPolicy", + "doc": "Real-time Samza job data replication policy. Using int because Avro Enums are not evolvable 0 => NON_AGGREGATE, 1 => AGGREGATE, 2 => NONE, 3 => ACTIVE_ACTIVE", + "type": "int", + "default": 0 + }, + { + "name": "bufferReplayPolicy", + "type": "int", + "doc": "Policy that will be used during buffer replay. rewindTimeInSeconds defines the delta. 0 => REWIND_FROM_EOP (replay from 'EOP - rewindTimeInSeconds'), 1 => REWIND_FROM_SOP (replay from 'SOP - rewindTimeInSeconds')", + "default": 0 + }, + {"name": "realTimeTopicName", "type": "string", "default": "", "doc": "Name of the real time topic this store/version uses"} + ] + } + ], + "default": null + }, + { + "name": "accessControlled", + "type": "boolean", + "default": false + }, + { + "name": "compressionStrategy", + "doc": "Using int because Avro Enums are not evolvable", + "type": "int", + "default": 0 + }, + { + "name": "chunkingEnabled", + "type": "boolean", + "default": false + }, + { + "name": "rmdChunkingEnabled", + "type": "boolean", + "default": false + }, + { + "name": "singleGetRouterCacheEnabled", + "aliases": ["routerCacheEnabled"], + "type": "boolean", + "default": false + }, + { + "name": "batchGetRouterCacheEnabled", + "type": "boolean", + "default": false + }, + { + "name": "batchGetLimit", + "doc": "The max key number allowed in batch get request, and Venice will use cluster-level config if the limit (not positive) is not valid", + "type": "int", + "default": -1 + }, + { + "name": "numVersionsToPreserve", + "doc": "The max number of versions the store should preserve. Venice will use cluster-level config if the number is 0 here.", + "type": "int", + "default": 0 + }, + { + "name": "incrementalPushEnabled", + "doc": "a flag to see if the store supports incremental push or not", + "type": "boolean", + "default": false + }, + { + "name": "separateRealTimeTopicEnabled", + "doc": "Flag to see if the store supports separate real-time topic for incremental push.", + "type": "boolean", + "default": false + }, + { + "name": "isMigrating", + "doc": "Whether or not the store is in the process of migration", + "type": "boolean", + "default": false + }, + { + "name": "writeComputationEnabled", + "doc": "Whether write-path computation feature is enabled for this store", + "type": "boolean", + "default": false + }, + { + "name": "replicationMetadataVersionID", + "doc": "RMD (Replication metadata) version ID on the store-level. Default -1 means NOT_SET and the cluster-level RMD version ID should be used for stores.", + "type": "int", + "default": -1 + }, + { + "name": "readComputationEnabled", + "doc": "Whether read-path computation feature is enabled for this store", + "type": "boolean", + "default": false + }, + { + "name": "bootstrapToOnlineTimeoutInHours", + "doc": "Maximum number of hours allowed for the store to transition from bootstrap to online state", + "type": "int", + "default": 24 + }, + { + "name": "leaderFollowerModelEnabled", + "doc": "Whether or not to use leader follower state transition model for upcoming version", + "type": "boolean", + "default": false + }, + { + "name": "backupStrategy", + "doc": "Strategies to store backup versions.", + "type": "int", + "default": 0 + }, + { + "name": "clientDecompressionEnabled", + "type": "boolean", + "default": true + }, + { + "name": "schemaAutoRegisterFromPushJobEnabled", + "type": "boolean", + "default": false + }, + { + "name": "hybridStoreOverheadBypass", + "type": "boolean", + "default": false + }, + { + "name": "hybridStoreDiskQuotaEnabled", + "doc": "Whether or not to enable disk storage quota for a hybrid store", + "type": "boolean", + "default": false + }, + { + "name": "ETLStoreConfig", + "type": [ + "null", + { + "name": "ETLStoreConfigRecord", + "type": "record", + "fields": [ + { + "name": "etledUserProxyAccount", + "type": ["null", "string"] + }, + { + "name": "regularVersionETLEnabled", + "type": "boolean" + }, + { + "name": "futureVersionETLEnabled", + "type": "boolean" + } + ] + } + ], + "default": null + }, + { + "name": "partitionerConfig", + "type": [ + "null", + { + "name": "PartitionerConfigRecord", + "type": "record", + "fields": [ + { + "name": "partitionerClass", + "type": "string" + }, + { + "name": "partitionerParams", + "type": { + "type": "map", + "values": "string" + } + }, + { + "name": "amplificationFactor", + "type": "int" + } + ] + } + ], + "default": null + }, + { + "name": "nativeReplicationEnabled", + "type": "boolean", + "default": false + }, + { + "name": "pushStreamSourceAddress", + "type": ["null", "string"], + "default": null + }, + { + "name": "largestUsedVersionNumber", + "type": ["null", "int"], + "default": null + }, + { + "name": "incrementalPushPolicy", + "doc": "Incremental Push Policy to reconcile with real time pushes. Using int because Avro Enums are not evolvable 0 => PUSH_TO_VERSION_TOPIC, 1 => INCREMENTAL_PUSH_SAME_AS_REAL_TIME", + "type": "int", + "default": 0 + }, + { + "name": "backupVersionRetentionMs", + "type": "long", + "doc": "Backup version retention time after a new version is promoted to the current version, if not specified, Venice will use the configured retention as the default policy", + "default": -1 + }, + { + "name": "replicationFactor", + "doc": "number of replica each store version will have", + "type": "int", + "default": 3 + }, + { + "name": "migrationDuplicateStore", + "doc": "Whether or not the store is a duplicate store in the process of migration", + "type": "boolean", + "default": false + }, + { + "name": "nativeReplicationSourceFabric", + "doc": "The source fabric to be used when the store is running in Native Replication mode.", + "type": ["null", "string"], + "default": null + }, + { + "name": "activeActiveReplicationEnabled", + "doc": "A command option to enable/disable Active/Active replication feature for a store", + "type": "boolean", + "default": false + }, + { + "name": "disableMetaStore", + "doc": "An UpdateStore command option to disable the companion meta system store", + "type": "boolean", + "default": false + }, + { + "name": "disableDavinciPushStatusStore", + "doc": "An UpdateStore command option to disable the companion davinci push status store", + "type": "boolean", + "default": false + }, + { + "name": "applyTargetVersionFilterForIncPush", + "doc": "An UpdateStore command option to enable/disable applying the target version filter for incremental pushes", + "type": "boolean", + "default": false + }, + { + "name": "updatedConfigsList", + "doc": "The list that contains all updated configs by the UpdateStore command. Most of the fields in UpdateStore are not optional, and changing those fields to Optional (Union) is not a backward compatible change, so we have to add an addition array field to record all updated configs in parent controller.", + "type": { + "type": "array", + "items": "string" + }, + "default": [] + }, + { + "name": "replicateAllConfigs", + "doc": "A flag to indicate whether all store configs in parent cluster will be replicated to child clusters; true by default, so that existing UpdateStore messages in Admin topic will behave the same as before.", + "type": "boolean", + "default": true + }, + { + "name": "regionsFilter", + "doc": "A list of regions that will be impacted by the UpdateStore command", + "type": ["null", "string"], + "default": null + }, + { + "name": "storagePersona", + "doc": "The name of the StoragePersona to add to the store", + "type": ["null", "string"], + "default": null + }, + { + "name": "views", + "doc": "A map of views which describe and configure a downstream view of a venice store. Keys in this map are for convenience of managing configs.", + "type": ["null", + { + "type":"map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": { + "name": "StoreViewConfigRecord", + "type": "record", + "doc": "A configuration for a particular view. This config should inform Venice leaders how to transform and transmit data to destination views.", + "fields": [ + { + "name": "viewClassName", + "type": "string", + "doc": "This informs what kind of view we are materializing. This then informs what kind of parameters are passed to parse this input. This is expected to be a fully formed class path name for materialization.", + "default": "" + }, + { + "name": "viewParameters", + "doc": "Optional parameters to be passed to the given view config.", + "type": ["null", + { + "type": "map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": { "type": "string", "avro.java.string": "String" } + } + ], + "default": null + } + ] + } + }], + "default": null + }, + { + "name": "latestSuperSetValueSchemaId", + "doc": "The schema id for the latest superset schema", + "type" : "int", + "default": -1 + }, + { + "name": "storageNodeReadQuotaEnabled", + "doc": "Whether storage node read quota is enabled for this store", + "type": "boolean", + "default": false + }, + { + "name": "minCompactionLagSeconds", + "doc": "Store-level version topic min compaction lag", + "type": "long", + "default": -1 + }, + { + "name": "maxCompactionLagSeconds", + "doc": "Store-level version topic max compaction lag", + "type": "long", + "default": -1 + }, + { + "name": "maxRecordSizeBytes", + "doc": "Store-level maximum size of any record in bytes for batch push jobs", + "type": "int", + "default": -1 + }, + { + "name": "maxNearlineRecordSizeBytes", + "doc": "Store-level maximum size of any record in bytes for nearline jobs with partial updates", + "type": "int", + "default": -1 + }, + { + "name": "unusedSchemaDeletionEnabled", + "doc": "Whether unused schema deletion is enabled or not.", + "type": "boolean", + "default": false + }, + { + "name": "blobTransferEnabled", + "doc": "Flag to indicate if the blob transfer is allowed or not", + "type": "boolean", + "default": false + }, + { + "name": "nearlineProducerCompressionEnabled", + "doc": "Flag to control whether the producer in Server for nearline workload will enable compression or not", + "type": "boolean", + "default": true + }, + { + "name": "nearlineProducerCountPerWriter", + "doc": "How many producers will be used for the nearline producer in Server to improve producing throughput", + "type": "int", + "default": 1 + }, + { + "name": "targetSwapRegion", + "doc": "Controls what region to swap in the current version during target colo push", + "type": ["null","string"], + "default": null + }, + { + "name": "targetSwapRegionWaitTime", + "doc": "Controls how long to wait in minutes before swapping the version on the regions", + "type": "int", + "default": 60 + }, + { + "name": "isDaVinciHeartBeatReported", + "doc": "Flag to indicate whether DVC is bootstrapping and sending heartbeats", + "type": "boolean", + "default": false + }, + { + "name": "updateRealTimeTopic", + "doc": "Flag to indicate whether to update real time topic when updating partition count", + "type": "boolean", + "default": false + } + ] + }, + { + "name": "DeleteStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "largestUsedVersionNumber", + "type": "int" + } + ] + }, + { + "name": "DeleteOldVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "versionNum", + "type": "int" + } + ] + }, + { + "name": "MigrateStore", + "type": "record", + "fields": [ + { + "name": "srcClusterName", + "type": "string" + }, + { + "name": "destClusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "AbortMigration", + "type": "record", + "fields": [ + { + "name": "srcClusterName", + "type": "string" + }, + { + "name": "destClusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "AddVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "pushJobId", + "type": "string" + }, + { + "name": "versionNum", + "type": "int" + }, + { + "name": "numberOfPartitions", + "type": "int" + }, + { + "name": "pushType", + "doc": "The push type of the new version, 0 => BATCH, 1 => STREAM_REPROCESSING. Previous add version messages will default to BATCH and this is a safe because they were created when BATCH was the only version type", + "type": "int", + "default": 0 + }, + { + "name": "pushStreamSourceAddress", + "type": ["null", "string"], + "default": null + }, + { + "name": "rewindTimeInSecondsOverride", + "doc": "The overridable rewind time config for this specific version of a hybrid store, and if it is not specified, the new version will use the store-level rewind time config", + "type": "long", + "default": -1 + }, + { + "name": "timestampMetadataVersionId", + "doc": "The A/A metadata schema version ID that will be used to deserialize metadataPayload.", + "type": "int", + "default": -1 + }, + { + "name": "versionSwapDeferred", + "doc": "Indicates if swapping this version to current version after push completion should be initiated or not", + "type": "boolean", + "default": false + }, + { + "name": "targetedRegions", + "doc": "The list of regions that is separated by comma for targeted region push. If set, this admin message should only be consumed by the targeted regions", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + }, + { + "name": "repushSourceVersion", + "doc": "Indicates the source version from which a repush version is created", + "type": "int", + "default": -1 + } + ] + }, + { + "name": "DerivedSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "schema", + "type": "SchemaMeta" + }, + { + "name": "valueSchemaId", + "type": "int" + }, + { + "name": "derivedSchemaId", + "type": "int" + } + ] + }, + { + "name": "SupersetSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "valueSchema", + "type": "SchemaMeta" + }, + { + "name": "valueSchemaId", + "type": "int" + }, + { + "name": "supersetSchema", + "type": "SchemaMeta" + }, + { + "name": "supersetSchemaId", + "type": "int" + } + ] + }, + { + "name": "ConfigureNativeReplicationForCluster", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeType", + "type": "string" + }, + { + "name": "enabled", + "type": "boolean" + }, + { + "name": "nativeReplicationSourceRegion", + "doc": "The source region to be used when the store is running in Native Replication mode.", + "type": ["null", "string"], + "default": null + }, + { + "name": "regionsFilter", + "type": ["null", "string"], + "default": null + } + ] + }, + { + "name": "MetadataSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "valueSchemaId", + "type": "int" + }, + { + "name": "metadataSchema", + "type": "SchemaMeta" + }, + { + "name": "timestampMetadataVersionId", + "type": "int", + "aliases": ["metadataVersionId"], + "default": -1 + } + ] + }, + { + "name": "ConfigureActiveActiveReplicationForCluster", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeType", + "type": "string" + }, + { + "name": "enabled", + "type": "boolean" + }, + { + "name": "regionsFilter", + "type": ["null", "string"], + "default": null + } + ] + }, { + "name": "ConfigureIncrementalPushForCluster", + "doc": "A command to migrate all incremental push stores in a cluster to a specific incremental push policy.", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "incrementalPushPolicyToFilter", + "doc": "If this batch update command is trying to configure existing incremental push store type, their incremental push policy should also match this filter before the batch update command applies any change to them. Default value is -1, meaning there is no filter.", + "type": "int", + "default": -1 + }, + { + "name": "incrementalPushPolicyToApply", + "doc": "This field will determine what incremental push policy will be applied to the selected stores. Default value is 1, which is the INCREMENTAL_PUSH_SAME_AS_REAL_TIME policy", + "type": "int", + "default": 1 + }, + { + "name": "regionsFilter", + "type": ["null", "string"], + "default": null + } + ] + }, { + "name": "MetaSystemStoreAutoCreationValidation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, { + "name": "PushStatusSystemStoreAutoCreationValidation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, { + "name": "CreateStoragePersona", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "quotaNumber", + "type": "long" + }, + { + "name": "storesToEnforce", + "type": { + "type": "array", + "items": "string", + "default": [] + } + }, + { + "name": "owners", + "type": { + "type": "array", + "items": "string", + "default": [] + } + } + ] + }, { + "name": "DeleteStoragePersona", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "name", + "type": "string" + } + ] + }, { + "name": "UpdateStoragePersona", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, { + "name": "name", + "type": "string" + }, { + "name": "quotaNumber", + "type": ["null","long"], + "default": null + }, { + "name": "storesToEnforce", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + }, { + "name": "owners", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + } + ] + }, + { + "name": "DeleteUnusedValueSchemas", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "schemaIds", + "type": { + "type": "array", + "items": "int", + "default": [] + } + } + ] + }, + { + "name": "RollbackCurrentVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "regionsFilter", + "doc": "A list of regions that will be impacted by the RollbackCurrentVersion command", + "type": ["null", "string"], + "default": null + } + ] + }, + { + "name": "RollForwardCurrentVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "regionsFilter", + "doc": "A list of regions that will be impacted by the RollForwardCurrentVersion command", + "type": ["null", "string"], + "default": null + } + ] + } + ] + } + ] +}