From 13c5825c00499374f35dc3d97f16f89bb0b3e364 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth <954799+szilard-nemeth@users.noreply.github.com> Date: Fri, 22 Sep 2023 20:00:50 +0200 Subject: [PATCH] YARN-11573. Add config option to make container allocation prefer nodes without reserved containers (#6098) --- .../ActivityDiagnosticConstant.java | 1 + .../CapacitySchedulerConfiguration.java | 11 ++ .../allocator/RegularContainerAllocator.java | 31 ++++- .../TestCapacitySchedulerMultiNodes.java | 110 ++++++++++++++++++ 4 files changed, 152 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java index ecaa88438a546..3eb6dc24e0901 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java @@ -107,4 +107,5 @@ public class ActivityDiagnosticConstant { public final static String NODE_CAN_NOT_FIND_CONTAINER_TO_BE_UNRESERVED_WHEN_NEEDED = "Node can't find a container to be unreserved when needed"; + public static final String NODE_HAS_BEEN_RESERVED = "Node has been reserved"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 417638516251a..5ab237d282a6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -154,6 +154,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true; + public static final String SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS = PREFIX + + "skip-allocate-on-nodes-with-reserved-containers"; + + @Private + public static final boolean DEFAULT_SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS = false; + @Private public static final String MAXIMUM_ALLOCATION = "maximum-allocation"; @@ -938,6 +944,11 @@ public boolean getReservationContinueLook() { DEFAULT_RESERVE_CONT_LOOK_ALL_NODES); } + public boolean getSkipAllocateOnNodesWithReservedContainer() { + return getBoolean(SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS, + DEFAULT_SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS); + } + private static String getAclKey(QueueACL acl) { return "acl_" + StringUtils.toLowerCase(acl.toString()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index c46b0341f74d0..f211b65b34f47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -24,8 +24,11 @@ import java.util.Optional; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.records.Container; @@ -850,9 +853,23 @@ private ContainerAllocation allocate(Resource clusterResource, Iterator iter = schedulingPS.getPreferredNodeIterator( candidates); + while (iter.hasNext()) { FiCaSchedulerNode node = iter.next(); + // Do not schedule if there are any reservations to fulfill on the node + if (iter.hasNext() && + node.getReservedContainer() != null && + isSkipAllocateOnNodesWithReservedContainer()) { + LOG.debug("Skipping scheduling on node {} since it has already been" + + " reserved by {}", node.getNodeID(), + node.getReservedContainer().getContainerId()); + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, schedulerKey, + ActivityDiagnosticConstant.NODE_HAS_BEEN_RESERVED, ActivityLevel.NODE); + continue; + } + if (reservedContainer == null) { result = preCheckForNodeCandidateSet(node, schedulingMode, resourceLimits, schedulerKey); @@ -894,7 +911,19 @@ private ContainerAllocation allocate(Resource clusterResource, return result; } - + + private boolean isSkipAllocateOnNodesWithReservedContainer() { + ResourceScheduler scheduler = rmContext.getScheduler(); + boolean skipAllocateOnNodesWithReservedContainer = false; + if (scheduler instanceof CapacityScheduler) { + CapacityScheduler cs = (CapacityScheduler) scheduler; + CapacitySchedulerConfiguration csConf = cs.getConfiguration(); + skipAllocateOnNodesWithReservedContainer = + csConf.getSkipAllocateOnNodesWithReservedContainer(); + } + return skipAllocateOnNodesWithReservedContainer; + } + @Override public CSAssignment assignContainers(Resource clusterResource, CandidateNodeSet candidates, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java index 48e8569f0f7c5..b08903893ffeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java @@ -25,12 +25,16 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.slf4j.Logger; @@ -478,4 +482,110 @@ public void testMultiNodeSorterAfterHeartbeatInterval() throws Exception { rm.stop(); } + @Test(timeout=30000) + public void testSkipAllocationOnNodeReservedByAnotherApp() throws Exception { + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based.sorting-interval.ms", 0); + newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", 1.0f); + newConf.set(CapacitySchedulerConfiguration.SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS, + "true"); + MockRM rm1 = new MockRM(newConf); + + rm1.start(); + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("127.0.0.2:1235", 8 * GB); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = MockRMAppSubmitter.submit(rm1, MockRMAppSubmissionData.Builder + .createWithMemory(5 * GB, rm1) + .withAppName("app") + .withUser("user") + .withQueue("default") + .build()); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // launch another app to queue, AM container should be launched in nm2 + RMApp app2 = MockRMAppSubmitter.submit(rm1, MockRMAppSubmissionData.Builder + .createWithMemory(5 * GB, rm1) + .withAppName("app") + .withUser("user") + .withQueue("default") + .build()); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp schedulerApp2 = + cs.getApplicationAttempt(am2.getApplicationAttemptId()); + + // Ask a container with 4 GB memory size for app1, + am1.allocate("*", 4 * GB, 1, new ArrayList<>()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + + // Check containers of app1 and app2. + Set reservedContainers = checkReservedContainers(cs, + rm1.getRMContext().getRMNodes(), 1); + Assert.assertEquals(1, reservedContainers.size()); + RMNode nodeWithReservedContainer = reservedContainers.iterator().next(); + LOG.debug("Reserved container on: {}", nodeWithReservedContainer); + + //Move reservation to nm1 for easier testing + if (nodeWithReservedContainer.getNodeID().getHost().startsWith("127.0.0.2")) { + moveReservation(cs, rm1, nm1, nm2, am1); + } + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertNull(cs.getNode(nm2.getNodeId()).getReservedContainer()); + + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp1.getReservedContainers().size()); + + //Make sure to have available headroom on the child queue, + // see: RegularContainerAllocator#checkHeadroom, + //that can make RegularContainerAllocator.preCheckForNodeCandidateSet to return + // ContainerAllocation.QUEUE_SKIPPED + MockNM nm3 = rm1.registerNode("127.0.0.3:1235", 3 * GB); + + //Allocate a container for app2, we expect this to be allocated on nm2 as + // nm1 has a reservation for another app + am2.allocate("*", 4 * GB, 1, new ArrayList<>()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer()); + + rm1.close(); + } + + private static void moveReservation(CapacityScheduler cs, + MockRM rm1, MockNM nm1, MockNM nm2, MockAM am1) { + RMNode sourceNode = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + RMNode targetNode = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + SchedulerApplicationAttempt firstSchedulerAppAttempt = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp app = (FiCaSchedulerApp)firstSchedulerAppAttempt; + RMContainer reservedContainer = cs.getNode(sourceNode.getNodeID()).getReservedContainer(); + LOG.debug("Moving reservation"); + app.moveReservation(reservedContainer, + cs.getNode(sourceNode.getNodeID()), cs.getNode(targetNode.getNodeID())); + } + + private static Set checkReservedContainers(CapacityScheduler cs, + ConcurrentMap rmNodes, int expectedNumberOfContainers) { + Set result = new HashSet<>(); + for (Map.Entry entry : rmNodes.entrySet()) { + if (cs.getNode(entry.getKey()).getReservedContainer() != null) { + result.add(entry.getValue()); + } + } + + Assert.assertEquals(expectedNumberOfContainers, result.size()); + return result; + } }