Skip to content

Commit

Permalink
YARN-11548. [Federation] Router Supports Format FederationStateStore. (
Browse files Browse the repository at this point in the history
…#6116) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <[email protected]>
Signed-off-by: Shilun Fan <[email protected]>
  • Loading branch information
slfan1989 authored Nov 7, 2023
1 parent 597ceaa commit 72d7b43
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,11 @@ default void checkVersion() throws Exception {
", but loading version " + loadedVersion);
}
}

/**
* We will clear the data in stateStore through the deleteStateStore method.
*
* @throws Exception an exception occurred in delete store.
*/
void deleteStateStore() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,16 @@ public void storeVersion() throws Exception {
version = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
}

@Override
public void deleteStateStore() throws Exception {
membership.clear();
applications.clear();
reservations.clear();
policies.clear();
sequenceNum = new AtomicInteger();
masterKeyId = new AtomicInteger();
}

@Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.List;
import java.util.TimeZone;
Expand Down Expand Up @@ -216,6 +217,10 @@ public class SQLFederationStateStore implements FederationStateStore {
private static final String CALL_SP_LOAD_VERSION =
"{call sp_getVersion(?, ?)}";

private static final List<String> TABLES = new ArrayList<>(
Arrays.asList("applicationsHomeSubCluster", "membership", "policies", "versions",
"reservationsHomeSubCluster", "masterKeys", "delegationTokens", "sequenceTable"));

private Calendar utcCalendar =
Calendar.getInstance(TimeZone.getTimeZone("UTC"));

Expand Down Expand Up @@ -1122,6 +1127,11 @@ public void storeVersion() throws Exception {
storeVersion(fedVersion, versionComment);
}

@Override
public void deleteStateStore() throws Exception {
truncateTable();
}

/**
* Store the Federation Version in the database.
*
Expand Down Expand Up @@ -2077,6 +2087,32 @@ private int querySequenceTable(String sequenceName, boolean isUpdate){
}
}

/**
* We will truncate the tables, iterate through each table individually,
* and then clean the tables.
*/
private void truncateTable() {
Connection connection = null;
try {
connection = getConnection(false);
FederationQueryRunner runner = new FederationQueryRunner();
for (String table : TABLES) {
LOG.info("truncate table = {} start.", table);
runner.truncateTable(connection, table);
LOG.info("truncate table = {} finished.", table);
}
} catch (Exception e) {
throw new RuntimeException("Could not truncate table!", e);
} finally {
// Return to the pool the CallableStatement
try {
FederationStateStoreUtils.returnToPool(LOG, null, connection);
} catch (YarnException e) {
LOG.error("close connection error.", e);
}
}
}

@VisibleForTesting
public HikariDataSource getDataSource() {
return dataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
private ZKFederationStateStoreOpDurations opDurations =
ZKFederationStateStoreOpDurations.getInstance();

private Configuration configuration;

/*
* Indicates different app attempt state store operations.
*/
Expand All @@ -251,7 +253,7 @@ private final static class AppNodeSplitInfo {
public void init(Configuration conf) throws YarnException {

LOG.info("Initializing ZooKeeper connection");

this.configuration = conf;
maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
Expand All @@ -273,13 +275,8 @@ public void init(Configuration conf) throws YarnException {
reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION);
versionNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_VERSION);

String hierarchiesPath = getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES);
routerAppRootHierarchies = new HashMap<>();
routerAppRootHierarchies.put(0, appsZNode);
for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
routerAppRootHierarchies.put(splitIndex,
getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
}
// Initialize hierarchical path
initHierarchiesPath();

appIdNodeSplitIndex = conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
Expand All @@ -302,26 +299,7 @@ public void init(Configuration conf) throws YarnException {
ROUTER_RM_DT_MASTER_KEY_ID_ZNODE_NAME);

// Create base znode for each entity
try {
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
zkManager.createRootDirRecursively(membershipZNode, zkAcl);
zkManager.createRootDirRecursively(appsZNode, zkAcl);
zkManager.createRootDirRecursively(
getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES));
for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
zkManager.createRootDirRecursively(
routerAppRootHierarchies.get(splitIndex));
}
zkManager.createRootDirRecursively(policiesZNode, zkAcl);
zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl);
zkManager.createRootDirRecursively(routerRMDTMasterKeysRootPath, zkAcl);
zkManager.createRootDirRecursively(routerRMDelegationTokensRootPath, zkAcl);
zkManager.createRootDirRecursively(versionNode, zkAcl);
} catch (Exception e) {
String errMsg = "Cannot create base directories: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
createBaseZNodeForEachEntity();

// Distributed sequenceNum.
try {
Expand Down Expand Up @@ -831,6 +809,60 @@ public void storeVersion() throws Exception {
put(versionNode, data, isUpdate);
}

private void initHierarchiesPath() {
String hierarchiesPath = getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES);
routerAppRootHierarchies = new HashMap<>();
routerAppRootHierarchies.put(0, appsZNode);
for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
routerAppRootHierarchies.put(splitIndex,
getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
}
}

private void createBaseZNodeForEachEntity() throws YarnException {
try {
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(configuration);
zkManager.createRootDirRecursively(membershipZNode, zkAcl);
zkManager.createRootDirRecursively(appsZNode, zkAcl);
zkManager.createRootDirRecursively(
getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES));
for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) {
zkManager.createRootDirRecursively(
routerAppRootHierarchies.get(splitIndex));
}
zkManager.createRootDirRecursively(policiesZNode, zkAcl);
zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl);
zkManager.createRootDirRecursively(routerRMDTMasterKeysRootPath, zkAcl);
zkManager.createRootDirRecursively(routerRMDelegationTokensRootPath, zkAcl);
zkManager.createRootDirRecursively(versionNode, zkAcl);
} catch (Exception e) {
String errMsg = "Cannot create base directories: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
}

@Override
public void deleteStateStore() throws Exception {

// Cleaning ZNodes and their child nodes;
// after the cleaning is complete, the ZNodes will no longer exist.
zkManager.delete(appsZNode);
zkManager.delete(membershipZNode);
zkManager.delete(policiesZNode);
zkManager.delete(reservationsZNode);
zkManager.delete(routerRMDTSecretManagerRoot);
zkManager.delete(routerRMDTMasterKeysRootPath);
zkManager.delete(routerRMDelegationTokensRootPath);
zkManager.delete(versionNode);

// Initialize hierarchical path
initHierarchiesPath();

// We will continue to create ZNodes to ensure that the base path exists.
createBaseZNodeForEachEntity();
}

/**
* Get the subcluster for an application.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,39 @@ public void updateSequenceTable(Connection connection, String sequenceName, int
}
}

public void truncateTable(Connection connection, String tableName)
throws SQLException {
DbType dbType = DatabaseProduct.getDbType(connection);
String deleteSQL = getTruncateStatement(dbType, tableName);
boolean committed = false;
Statement statement = null;
try {
statement = connection.createStatement();
statement.execute(deleteSQL);
connection.commit();
committed = true;
} catch (SQLException e) {
throw new SQLException("Unable to truncateTable due to: " + e.getMessage());
} finally {
if (!committed) {
rollbackDBConn(connection);
}
close(statement);
}
}

private String getTruncateStatement(DbType dbType, String tableName) {
if (isMYSQL(dbType)) {
return ("DELETE FROM \"" + tableName + "\"");
} else {
return("DELETE FROM " + tableName);
}
}

private boolean isMYSQL(DbType dbType) {
return dbType == DbType.MYSQL;
}

static void rollbackDBConn(Connection dbConn) {
try {
if (dbConn != null && !dbConn.isClosed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,4 +1118,8 @@ public ApplicationSubmissionContext getApplicationSubmissionContext(ApplicationI
public FederationCache getFederationCache() {
return federationCache;
}

public void deleteStore() throws Exception {
stateStore.deleteStateStore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

/**
* Base class for FederationMembershipStateStore implementations.
Expand Down Expand Up @@ -120,6 +121,7 @@ public void before() throws IOException, YarnException {

@After
public void after() throws Exception {
testDeleteStateStore();
stateStore.close();
}

Expand Down Expand Up @@ -1112,4 +1114,26 @@ public void testGetApplicationHomeSubClusterWithContext() throws Exception {
assertEquals(subClusterId, applicationHomeSubCluster.getHomeSubCluster());
assertEquals(context, applicationHomeSubCluster.getApplicationSubmissionContext());
}

public void testDeleteStateStore() throws Exception {
// Step1. We clean the StateStore.
FederationStateStore federationStateStore = this.getStateStore();
federationStateStore.deleteStateStore();

// Step2. When we query the sub-cluster information, it should not exist.
GetSubClustersInfoRequest request = GetSubClustersInfoRequest.newInstance(true);
List<SubClusterInfo> subClustersActive = stateStore.getSubClusters(request).getSubClusters();
assertNotNull(subClustersActive);
assertEquals(0, subClustersActive.size());

// Step3. When we query the applications' information, it should not exist.
GetApplicationsHomeSubClusterRequest getRequest =
GetApplicationsHomeSubClusterRequest.newInstance();
GetApplicationsHomeSubClusterResponse result =
stateStore.getApplicationsHomeSubCluster(getRequest);
assertNotNull(result);
List<ApplicationHomeSubCluster> appsHomeSubClusters = result.getAppsHomeSubClusters();
assertNotNull(appsHomeSubClusters);
assertEquals(0, appsHomeSubClusters.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ protected FederationStateStore createStateStore() {
conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL,
DATABASE_URL + System.currentTimeMillis());
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS, 10);
super.setConf(conf);
sqlFederationStateStore = new HSQLDBFederationStateStore();
return sqlFederationStateStore;
Expand Down Expand Up @@ -647,6 +648,6 @@ public void testCheckHikariDataSourceParam() throws SQLException {
assertEquals(10000, connTimeOut);
assertEquals("YARN-Federation-DataBasePool", poolName);
assertEquals(1, minimumIdle);
assertEquals(1, maximumPoolSize);
assertEquals(10, maximumPoolSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ public void checkVersion() throws Exception {
stateStoreClient.checkVersion();
}

@Override
public void deleteStateStore() throws Exception {
stateStoreClient.deleteStateStore();
}

@Override
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest request) throws YarnException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,15 @@ public static void removeApplication(Configuration conf, String applicationId)
LOG.info("Application is deleted from state store");
}

private static void handFormatStateStore() {
// TODO: YARN-11548. [Federation] Router Supports Format FederationStateStore.
System.err.println("format-state-store is not yet supported.");
private static void handFormatStateStore(Configuration conf) {
try {
System.out.println("Deleting Federation state store.");
FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance(conf);
System.out.println("Federation state store has been cleaned.");
facade.deleteStore();
} catch (Exception e) {
System.err.println("Delete Federation state store error, exception = " + e);
}
}

private static void handRemoveApplicationFromStateStore(Configuration conf,
Expand Down Expand Up @@ -409,7 +415,7 @@ private static void executeRouterCommand(Configuration conf, String[] args) {
CommandLine cliParser = new DefaultParser().parse(opts, args);

if (CMD_FORMAT_STATE_STORE.equals(cmd)) {
handFormatStateStore();
handFormatStateStore(conf);
} else if (CMD_REMOVE_APPLICATION_FROM_STATE_STORE.equals(cmd)) {
if (cliParser.hasOption(removeApplicationFromStateStoreOpt)) {
String applicationId = cliParser.getOptionValue(removeApplicationFromStateStoreOpt);
Expand Down

0 comments on commit 72d7b43

Please sign in to comment.