Skip to content

Commit

Permalink
YARN-8862. [BackPort] [GPG] Add Yarn Registry cleanup in ApplicationC…
Browse files Browse the repository at this point in the history
…leaner. (#6083) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <[email protected]>
Signed-off-by: Shilun Fan <[email protected]>
  • Loading branch information
slfan1989 authored Sep 28, 2023
1 parent 35c42e4 commit 1d2afc5
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.registry.client.api.BindFlags;
Expand Down Expand Up @@ -142,9 +143,7 @@ public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId,
// Then update the subClusterTokenMap
subClusterTokenMap.put(subClusterId, token);
} catch (YarnException | IOException e) {
LOG.error(
"Failed writing AMRMToken to registry for subcluster " + subClusterId,
e);
LOG.error("Failed writing AMRMToken to registry for subcluster {}.", subClusterId, e);
}
return update;
}
Expand Down Expand Up @@ -189,8 +188,7 @@ public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId,

retMap.put(scId, amrmToken);
} catch (Exception e) {
LOG.error("Failed reading registry key " + key
+ ", skipping subcluster " + scId, e);
LOG.error("Failed reading registry key {}, skipping subcluster {}.", key, scId, e);
}
}

Expand All @@ -202,24 +200,39 @@ public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId,
/**
* Remove an application from registry.
*
* @param appId application id
* @param appId application id.
*/
public synchronized void removeAppFromRegistry(ApplicationId appId) {
removeAppFromRegistry(appId, false);
}

/**
* Remove an application from registry.
*
* @param appId application id
* @param ignoreMemoryState whether to ignore the memory data in terms of
* known application
*/
public synchronized void removeAppFromRegistry(ApplicationId appId,
boolean ignoreMemoryState) {
Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
this.appSubClusterTokenMap.get(appId);
LOG.info("Removing all registry entries for {}", appId);

if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) {
return;
if (!ignoreMemoryState) {
if (MapUtils.isEmpty(subClusterTokenMap)) {
return;
}
}
LOG.info("Removing all registry entries for {}.", appId);

// Lastly remove the application directory
String key = getRegistryKey(appId, null);
try {
removeKeyRegistry(this.registry, this.user, key, true, true);
subClusterTokenMap.clear();
if (subClusterTokenMap != null) {
subClusterTokenMap.clear();
}
} catch (YarnException e) {
LOG.error("Failed removing registry directory key " + key, e);
LOG.error("Failed removing registry directory key {}.", key, e);
}
}

Expand Down Expand Up @@ -247,7 +260,7 @@ public String run() {
}
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry resolve key " + key + " failed", e);
LOG.error("Registry resolve key {} failed.", key, e);
}
}
return null;
Expand All @@ -271,7 +284,7 @@ public Boolean run() {
return true;
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry remove key " + key + " failed", e);
LOG.error("Registry remove key {} failed.", key, e);
}
}
return false;
Expand Down Expand Up @@ -300,7 +313,7 @@ public Boolean run() {
return true;
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry write key " + key + " failed", e);
LOG.error("Registry write key {} failed.", key, e);
}
}
return false;
Expand All @@ -317,18 +330,15 @@ public Boolean run() {
private List<String> listDirRegistry(final RegistryOperations registryImpl,
UserGroupInformation ugi, final String key, final boolean throwIfFails)
throws YarnException {
List<String> result = ugi.doAs(new PrivilegedAction<List<String>>() {
@Override
public List<String> run() {
try {
return registryImpl.list(key);
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry list key " + key + " failed", e);
}
List<String> result = ugi.doAs((PrivilegedAction<List<String>>) () -> {
try {
return registryImpl.list(key);
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry list key {} failed.", key, e);
}
return null;
}
return null;
});
if (result == null && throwIfFails) {
throw new YarnException("Registry list key " + key + " failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,31 @@ public void testBasicCase() {
this.registryClient.loadStateFromRegistry(appId).size());
}

@Test
public void testRemoveWithMemoryState() {
ApplicationId appId1 = ApplicationId.newInstance(0, 0);
ApplicationId appId2 = ApplicationId.newInstance(0, 1);
String scId0 = "subcluster0";

this.registryClient.writeAMRMTokenForUAM(appId1, scId0, new Token<>());
this.registryClient.writeAMRMTokenForUAM(appId2, scId0, new Token<>());
Assert.assertEquals(2, this.registryClient.getAllApplications().size());

// Create a new client instance
this.registryClient =
new FederationRegistryClient(this.conf, this.registry, this.user);

this.registryClient.loadStateFromRegistry(appId2);
// Should remove app2
this.registryClient.removeAppFromRegistry(appId2, false);
Assert.assertEquals(1, this.registryClient.getAllApplications().size());

// Should not remove app1 since memory state don't have it
this.registryClient.removeAppFromRegistry(appId1, false);
Assert.assertEquals(1, this.registryClient.getAllApplications().size());

// Should remove app1
this.registryClient.removeAppFromRegistry(appId1, true);
Assert.assertEquals(0, this.registryClient.getAllApplications().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.globalpolicygenerator;

import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;

/**
Expand All @@ -32,4 +33,8 @@ public interface GPGContext {
GPGPolicyFacade getPolicyFacade();

void setPolicyFacade(GPGPolicyFacade facade);

FederationRegistryClient getRegistryClient();

void setRegistryClient(FederationRegistryClient client);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.globalpolicygenerator;

import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;

/**
Expand All @@ -27,6 +28,7 @@ public class GPGContextImpl implements GPGContext {

private FederationStateStoreFacade facade;
private GPGPolicyFacade policyFacade;
private FederationRegistryClient registryClient;

@Override
public FederationStateStoreFacade getStateStoreFacade() {
Expand All @@ -48,4 +50,14 @@ public GPGPolicyFacade getPolicyFacade(){
public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){
policyFacade = gpgPolicyfacade;
}

@Override
public FederationRegistryClient getRegistryClient() {
return registryClient;
}

@Override
public void setRegistryClient(FederationRegistryClient client) {
registryClient = client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
Expand All @@ -46,6 +47,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
Expand Down Expand Up @@ -81,6 +83,7 @@ public class GlobalPolicyGenerator extends CompositeService {

// Federation Variables
private GPGContext gpgContext;
private RegistryOperations registry;

// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
Expand Down Expand Up @@ -123,6 +126,17 @@ protected void serviceInit(Configuration conf) throws Exception {
new GPGPolicyFacade(this.gpgContext.getStateStoreFacade(), conf);
this.gpgContext.setPolicyFacade(gpgPolicyFacade);

this.registry = FederationStateStoreFacade.createInstance(conf,
YarnConfiguration.YARN_REGISTRY_CLASS,
YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS,
RegistryOperations.class);
this.registry.init(conf);

UserGroupInformation user = UserGroupInformation.getCurrentUser();
FederationRegistryClient registryClient =
new FederationRegistryClient(conf, this.registry, user);
this.gpgContext.setRegistryClient(registryClient);

this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
Expand Down Expand Up @@ -157,6 +171,8 @@ protected void serviceStart() throws Exception {

super.serviceStart();

this.registry.start();

// Schedule SubClusterCleaner service
Configuration config = getConfig();
long scCleanerIntervalMs = config.getTimeDuration(
Expand Down Expand Up @@ -214,6 +230,11 @@ protected void serviceStart() throws Exception {

@Override
protected void serviceStop() throws Exception {
if (this.registry != null) {
this.registry.stop();
this.registry = null;
}

try {
if (this.scheduledExecutorService != null
&& !this.scheduledExecutorService.isShutdown()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.lang3.time.DurationFormatUtils;
Expand All @@ -27,9 +28,11 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
Expand All @@ -46,6 +49,7 @@ public abstract class ApplicationCleaner implements Runnable {

private Configuration conf;
private GPGContext gpgContext;
private FederationRegistryClient registryClient;

private int minRouterSuccessCount;
private int maxRouterRetry;
Expand All @@ -56,6 +60,7 @@ public void init(Configuration config, GPGContext context)

this.gpgContext = context;
this.conf = config;
this.registryClient = context.getRegistryClient();

String routerSpecString =
this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC,
Expand All @@ -80,10 +85,9 @@ public void init(Configuration config, GPGContext context)
+ this.minRouterSuccessCount + " should be positive");
}

LOG.info(
"Initialized AppCleaner with Router query with min success {}, "
+ "max retry {}, retry interval {}",
this.minRouterSuccessCount, this.maxRouterRetry,
LOG.info("Initialized AppCleaner with Router query with min success {}, " +
"max retry {}, retry interval {}.", this.minRouterSuccessCount,
this.maxRouterRetry,
DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis));
}

Expand All @@ -100,9 +104,9 @@ public GPGContext getGPGContext() {
public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf);

LOG.info(String.format("Contacting router at: %s", webAppAddress));
AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, "apps", AppsInfo.class, conf,
DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());
LOG.info("Contacting router at: {}.", webAppAddress);
AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, RMWSConsts.APPS,
AppsInfo.class, conf, DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());

Set<ApplicationId> appSet = new HashSet<>();
for (AppInfo appInfo : appsInfo.getApps()) {
Expand Down Expand Up @@ -148,6 +152,18 @@ public Set<ApplicationId> getRouterKnownApplications() throws YarnException {
+ " success Router queries after " + totalAttemptCount + " retries");
}

protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) {
List<String> allApps = this.registryClient.getAllApplications();
LOG.info("Got {} existing apps in registry.", allApps.size());
for (String app : allApps) {
ApplicationId appId = ApplicationId.fromString(app);
if (!knownApps.contains(appId)) {
LOG.info("removing finished application entry for {}", app);
this.registryClient.removeAppFromRegistry(appId, true);
}
}
}

@Override
public abstract void run();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public void run() {
LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e);
}
}
// Clean up registry entries
cleanupAppRecordInRegistry(routerApps);
} catch (Throwable e) {
LOG.error("Application cleaner started at time {} fails. ", now, e);
}
Expand Down
Loading

0 comments on commit 1d2afc5

Please sign in to comment.