Skip to content

Commit

Permalink
YARN-11573. Add config option to make container allocation prefer nod…
Browse files Browse the repository at this point in the history
…es without reserved containers (#6098)
  • Loading branch information
szilard-nemeth authored Sep 22, 2023
1 parent 0780710 commit 13c5825
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,5 @@ public class ActivityDiagnosticConstant {
public final static String
NODE_CAN_NOT_FIND_CONTAINER_TO_BE_UNRESERVED_WHEN_NEEDED =
"Node can't find a container to be unreserved when needed";
public static final String NODE_HAS_BEEN_RESERVED = "Node has been reserved";
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;

public static final String SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS = PREFIX
+ "skip-allocate-on-nodes-with-reserved-containers";

@Private
public static final boolean DEFAULT_SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS = false;

@Private
public static final String MAXIMUM_ALLOCATION = "maximum-allocation";

Expand Down Expand Up @@ -938,6 +944,11 @@ public boolean getReservationContinueLook() {
DEFAULT_RESERVE_CONT_LOOK_ALL_NODES);
}

public boolean getSkipAllocateOnNodesWithReservedContainer() {
return getBoolean(SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS,
DEFAULT_SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS);
}

private static String getAclKey(QueueACL acl) {
return "acl_" + StringUtils.toLowerCase(acl.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.Container;
Expand Down Expand Up @@ -850,9 +853,23 @@ private ContainerAllocation allocate(Resource clusterResource,

Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator(
candidates);

while (iter.hasNext()) {
FiCaSchedulerNode node = iter.next();

// Do not schedule if there are any reservations to fulfill on the node
if (iter.hasNext() &&
node.getReservedContainer() != null &&
isSkipAllocateOnNodesWithReservedContainer()) {
LOG.debug("Skipping scheduling on node {} since it has already been"
+ " reserved by {}", node.getNodeID(),
node.getReservedContainer().getContainerId());
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.NODE_HAS_BEEN_RESERVED, ActivityLevel.NODE);
continue;
}

if (reservedContainer == null) {
result = preCheckForNodeCandidateSet(node,
schedulingMode, resourceLimits, schedulerKey);
Expand Down Expand Up @@ -894,7 +911,19 @@ private ContainerAllocation allocate(Resource clusterResource,

return result;
}


private boolean isSkipAllocateOnNodesWithReservedContainer() {
ResourceScheduler scheduler = rmContext.getScheduler();
boolean skipAllocateOnNodesWithReservedContainer = false;
if (scheduler instanceof CapacityScheduler) {
CapacityScheduler cs = (CapacityScheduler) scheduler;
CapacitySchedulerConfiguration csConf = cs.getConfiguration();
skipAllocateOnNodesWithReservedContainer =
csConf.getSkipAllocateOnNodesWithReservedContainer();
}
return skipAllocateOnNodesWithReservedContainer;
}

@Override
public CSAssignment assignContainers(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators;

import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.slf4j.Logger;
Expand Down Expand Up @@ -478,4 +482,110 @@ public void testMultiNodeSorterAfterHeartbeatInterval() throws Exception {
rm.stop();
}

@Test(timeout=30000)
public void testSkipAllocationOnNodeReservedByAnotherApp() throws Exception {
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based.sorting-interval.ms", 0);
newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", 1.0f);
newConf.set(CapacitySchedulerConfiguration.SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS,
"true");
MockRM rm1 = new MockRM(newConf);

rm1.start();
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("127.0.0.2:1235", 8 * GB);

// launch an app to queue, AM container should be launched in nm1
RMApp app1 = MockRMAppSubmitter.submit(rm1, MockRMAppSubmissionData.Builder
.createWithMemory(5 * GB, rm1)
.withAppName("app")
.withUser("user")
.withQueue("default")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

// launch another app to queue, AM container should be launched in nm2
RMApp app2 = MockRMAppSubmitter.submit(rm1, MockRMAppSubmissionData.Builder
.createWithMemory(5 * GB, rm1)
.withAppName("app")
.withUser("user")
.withQueue("default")
.build());
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);

CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());

// Ask a container with 4 GB memory size for app1,
am1.allocate("*", 4 * GB, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));


// Check containers of app1 and app2.
Set<RMNode> reservedContainers = checkReservedContainers(cs,
rm1.getRMContext().getRMNodes(), 1);
Assert.assertEquals(1, reservedContainers.size());
RMNode nodeWithReservedContainer = reservedContainers.iterator().next();
LOG.debug("Reserved container on: {}", nodeWithReservedContainer);

//Move reservation to nm1 for easier testing
if (nodeWithReservedContainer.getNodeID().getHost().startsWith("127.0.0.2")) {
moveReservation(cs, rm1, nm1, nm2, am1);
}
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertNull(cs.getNode(nm2.getNodeId()).getReservedContainer());

Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());

//Make sure to have available headroom on the child queue,
// see: RegularContainerAllocator#checkHeadroom,
//that can make RegularContainerAllocator.preCheckForNodeCandidateSet to return
// ContainerAllocation.QUEUE_SKIPPED
MockNM nm3 = rm1.registerNode("127.0.0.3:1235", 3 * GB);

//Allocate a container for app2, we expect this to be allocated on nm2 as
// nm1 has a reservation for another app
am2.allocate("*", 4 * GB, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());

rm1.close();
}

private static void moveReservation(CapacityScheduler cs,
MockRM rm1, MockNM nm1, MockNM nm2, MockAM am1) {
RMNode sourceNode = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
RMNode targetNode = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerApplicationAttempt firstSchedulerAppAttempt =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp app = (FiCaSchedulerApp)firstSchedulerAppAttempt;
RMContainer reservedContainer = cs.getNode(sourceNode.getNodeID()).getReservedContainer();
LOG.debug("Moving reservation");
app.moveReservation(reservedContainer,
cs.getNode(sourceNode.getNodeID()), cs.getNode(targetNode.getNodeID()));
}

private static Set<RMNode> checkReservedContainers(CapacityScheduler cs,
ConcurrentMap<NodeId, RMNode> rmNodes, int expectedNumberOfContainers) {
Set<RMNode> result = new HashSet<>();
for (Map.Entry<NodeId, RMNode> entry : rmNodes.entrySet()) {
if (cs.getNode(entry.getKey()).getReservedContainer() != null) {
result.add(entry.getValue());
}
}

Assert.assertEquals(expectedNumberOfContainers, result.size());
return result;
}
}

0 comments on commit 13c5825

Please sign in to comment.