diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java index fa64188a608b9..9e4d1e6ed0e81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.collections.MapUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.registry.client.api.BindFlags; @@ -142,9 +143,7 @@ public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId, // Then update the subClusterTokenMap subClusterTokenMap.put(subClusterId, token); } catch (YarnException | IOException e) { - LOG.error( - "Failed writing AMRMToken to registry for subcluster " + subClusterId, - e); + LOG.error("Failed writing AMRMToken to registry for subcluster {}.", subClusterId, e); } return update; } @@ -189,8 +188,7 @@ public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId, retMap.put(scId, amrmToken); } catch (Exception e) { - LOG.error("Failed reading registry key " + key - + ", skipping subcluster " + scId, e); + LOG.error("Failed reading registry key {}, skipping subcluster {}.", key, scId, e); } } @@ -202,24 +200,39 @@ public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId, /** * Remove an application from registry. * - * @param appId application id + * @param appId application id. */ public synchronized void removeAppFromRegistry(ApplicationId appId) { + removeAppFromRegistry(appId, false); + } + + /** + * Remove an application from registry. + * + * @param appId application id + * @param ignoreMemoryState whether to ignore the memory data in terms of + * known application + */ + public synchronized void removeAppFromRegistry(ApplicationId appId, + boolean ignoreMemoryState) { Map> subClusterTokenMap = this.appSubClusterTokenMap.get(appId); - LOG.info("Removing all registry entries for {}", appId); - - if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) { - return; + if (!ignoreMemoryState) { + if (MapUtils.isEmpty(subClusterTokenMap)) { + return; + } } + LOG.info("Removing all registry entries for {}.", appId); // Lastly remove the application directory String key = getRegistryKey(appId, null); try { removeKeyRegistry(this.registry, this.user, key, true, true); - subClusterTokenMap.clear(); + if (subClusterTokenMap != null) { + subClusterTokenMap.clear(); + } } catch (YarnException e) { - LOG.error("Failed removing registry directory key " + key, e); + LOG.error("Failed removing registry directory key {}.", key, e); } } @@ -247,7 +260,7 @@ public String run() { } } catch (Throwable e) { if (throwIfFails) { - LOG.error("Registry resolve key " + key + " failed", e); + LOG.error("Registry resolve key {} failed.", key, e); } } return null; @@ -271,7 +284,7 @@ public Boolean run() { return true; } catch (Throwable e) { if (throwIfFails) { - LOG.error("Registry remove key " + key + " failed", e); + LOG.error("Registry remove key {} failed.", key, e); } } return false; @@ -300,7 +313,7 @@ public Boolean run() { return true; } catch (Throwable e) { if (throwIfFails) { - LOG.error("Registry write key " + key + " failed", e); + LOG.error("Registry write key {} failed.", key, e); } } return false; @@ -317,18 +330,15 @@ public Boolean run() { private List listDirRegistry(final RegistryOperations registryImpl, UserGroupInformation ugi, final String key, final boolean throwIfFails) throws YarnException { - List result = ugi.doAs(new PrivilegedAction>() { - @Override - public List run() { - try { - return registryImpl.list(key); - } catch (Throwable e) { - if (throwIfFails) { - LOG.error("Registry list key " + key + " failed", e); - } + List result = ugi.doAs((PrivilegedAction>) () -> { + try { + return registryImpl.list(key); + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry list key {} failed.", key, e); } - return null; } + return null; }); if (result == null && throwIfFails) { throw new YarnException("Registry list key " + key + " failed"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java index 42be851512af3..cccfbb4613c0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java @@ -87,4 +87,31 @@ public void testBasicCase() { this.registryClient.loadStateFromRegistry(appId).size()); } + @Test + public void testRemoveWithMemoryState() { + ApplicationId appId1 = ApplicationId.newInstance(0, 0); + ApplicationId appId2 = ApplicationId.newInstance(0, 1); + String scId0 = "subcluster0"; + + this.registryClient.writeAMRMTokenForUAM(appId1, scId0, new Token<>()); + this.registryClient.writeAMRMTokenForUAM(appId2, scId0, new Token<>()); + Assert.assertEquals(2, this.registryClient.getAllApplications().size()); + + // Create a new client instance + this.registryClient = + new FederationRegistryClient(this.conf, this.registry, this.user); + + this.registryClient.loadStateFromRegistry(appId2); + // Should remove app2 + this.registryClient.removeAppFromRegistry(appId2, false); + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + + // Should not remove app1 since memory state don't have it + this.registryClient.removeAppFromRegistry(appId1, false); + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + + // Should remove app1 + this.registryClient.removeAppFromRegistry(appId1, true); + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java index 6b0a5a43112b0..e54244d7133d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; /** @@ -32,4 +33,8 @@ public interface GPGContext { GPGPolicyFacade getPolicyFacade(); void setPolicyFacade(GPGPolicyFacade facade); + + FederationRegistryClient getRegistryClient(); + + void setRegistryClient(FederationRegistryClient client); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java index bb498448fae81..b14f502990182 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; /** @@ -27,6 +28,7 @@ public class GPGContextImpl implements GPGContext { private FederationStateStoreFacade facade; private GPGPolicyFacade policyFacade; + private FederationRegistryClient registryClient; @Override public FederationStateStoreFacade getStateStoreFacade() { @@ -48,4 +50,14 @@ public GPGPolicyFacade getPolicyFacade(){ public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){ policyFacade = gpgPolicyfacade; } + + @Override + public FederationRegistryClient getRegistryClient() { + return registryClient; + } + + @Override + public void setRegistryClient(FederationRegistryClient client) { + registryClient = client; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java index ba8ce856cdaa5..7ea2f5f27277b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java @@ -37,6 +37,7 @@ import org.apache.hadoop.security.HttpCrossOriginFilterInitializer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.JvmPauseMonitor; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner; import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator; import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner; @@ -81,6 +83,7 @@ public class GlobalPolicyGenerator extends CompositeService { // Federation Variables private GPGContext gpgContext; + private RegistryOperations registry; // Scheduler service that runs tasks periodically private ScheduledThreadPoolExecutor scheduledExecutorService; @@ -123,6 +126,17 @@ protected void serviceInit(Configuration conf) throws Exception { new GPGPolicyFacade(this.gpgContext.getStateStoreFacade(), conf); this.gpgContext.setPolicyFacade(gpgPolicyFacade); + this.registry = FederationStateStoreFacade.createInstance(conf, + YarnConfiguration.YARN_REGISTRY_CLASS, + YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS, + RegistryOperations.class); + this.registry.init(conf); + + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + FederationRegistryClient registryClient = + new FederationRegistryClient(conf, this.registry, user); + this.gpgContext.setRegistryClient(registryClient); + this.scheduledExecutorService = new ScheduledThreadPoolExecutor( conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS, YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS)); @@ -157,6 +171,8 @@ protected void serviceStart() throws Exception { super.serviceStart(); + this.registry.start(); + // Schedule SubClusterCleaner service Configuration config = getConfig(); long scCleanerIntervalMs = config.getTimeDuration( @@ -214,6 +230,11 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { + if (this.registry != null) { + this.registry.stop(); + this.registry = null; + } + try { if (this.scheduledExecutorService != null && !this.scheduledExecutorService.isShutdown()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java index cd3f7618558e9..af0bd6184b797 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.commons.lang3.time.DurationFormatUtils; @@ -27,9 +28,11 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -46,6 +49,7 @@ public abstract class ApplicationCleaner implements Runnable { private Configuration conf; private GPGContext gpgContext; + private FederationRegistryClient registryClient; private int minRouterSuccessCount; private int maxRouterRetry; @@ -56,6 +60,7 @@ public void init(Configuration config, GPGContext context) this.gpgContext = context; this.conf = config; + this.registryClient = context.getRegistryClient(); String routerSpecString = this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, @@ -80,10 +85,9 @@ public void init(Configuration config, GPGContext context) + this.minRouterSuccessCount + " should be positive"); } - LOG.info( - "Initialized AppCleaner with Router query with min success {}, " - + "max retry {}, retry interval {}", - this.minRouterSuccessCount, this.maxRouterRetry, + LOG.info("Initialized AppCleaner with Router query with min success {}, " + + "max retry {}, retry interval {}.", this.minRouterSuccessCount, + this.maxRouterRetry, DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis)); } @@ -100,9 +104,9 @@ public GPGContext getGPGContext() { public Set getAppsFromRouter() throws YarnRuntimeException { String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf); - LOG.info(String.format("Contacting router at: %s", webAppAddress)); - AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, "apps", AppsInfo.class, conf, - DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString()); + LOG.info("Contacting router at: {}.", webAppAddress); + AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, RMWSConsts.APPS, + AppsInfo.class, conf, DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString()); Set appSet = new HashSet<>(); for (AppInfo appInfo : appsInfo.getApps()) { @@ -148,6 +152,18 @@ public Set getRouterKnownApplications() throws YarnException { + " success Router queries after " + totalAttemptCount + " retries"); } + protected void cleanupAppRecordInRegistry(Set knownApps) { + List allApps = this.registryClient.getAllApplications(); + LOG.info("Got {} existing apps in registry.", allApps.size()); + for (String app : allApps) { + ApplicationId appId = ApplicationId.fromString(app); + if (!knownApps.contains(appId)) { + LOG.info("removing finished application entry for {}", app); + this.registryClient.removeAppFromRegistry(appId, true); + } + } + } + @Override public abstract void run(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java index 857d2e645d4c4..5b2ff26fcfb4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java @@ -70,6 +70,8 @@ public void run() { LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e); } } + // Clean up registry entries + cleanupAppRecordInRegistry(routerApps); } catch (Throwable e) { LOG.error("Application cleaner started at time {} fails. ", now, e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java index 2d63c48236fb5..1e703b51960e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java @@ -24,15 +24,21 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl; @@ -50,6 +56,8 @@ public class TestDefaultApplicationCleaner { private FederationStateStoreFacade facade; private ApplicationCleaner appCleaner; private GPGContext gpgContext; + private RegistryOperations registry; + private FederationRegistryClient registryClient; private List appIds; // The list of applications returned by mocked router @@ -68,8 +76,18 @@ public void setup() throws Exception { facade = FederationStateStoreFacade.getInstance(); facade.reinitialize(stateStore, conf); + registry = new FSRegistryOperationsService(); + registry.init(conf); + registry.start(); + + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + registryClient = new FederationRegistryClient(conf, registry, user); + registryClient.cleanAllApplications(); + Assert.assertEquals(0, registryClient.getAllApplications().size()); + gpgContext = new GPGContextImpl(); gpgContext.setStateStoreFacade(facade); + gpgContext.setRegistryClient(registryClient); appCleaner = new TestableDefaultApplicationCleaner(); appCleaner.init(conf, gpgContext); @@ -87,7 +105,12 @@ public void setup() throws Exception { stateStore.addApplicationHomeSubCluster( AddApplicationHomeSubClusterRequest.newInstance( ApplicationHomeSubCluster.newInstance(appId, subClusterId))); + + // Write some registry entries for the app + registryClient.writeAMRMTokenForUAM(appId, subClusterId.toString(), + new Token()); } + Assert.assertEquals(3, registryClient.getAllApplications().size()); } @After @@ -96,6 +119,14 @@ public void breakDown() { stateStore.close(); stateStore = null; } + if (registryClient != null) { + registryClient.cleanAllApplications(); + registryClient = null; + } + if (registry != null) { + registry.stop(); + registry = null; + } } @Test @@ -116,6 +147,9 @@ public void testFederationStateStoreAppsCleanUp() throws YarnException { .getApplicationsHomeSubCluster( GetApplicationsHomeSubClusterRequest.newInstance()) .getAppsHomeSubClusters().size()); + + // The known app should not be cleaned in registry + Assert.assertEquals(1, registryClient.getAllApplications().size()); } /**