Skip to content

Commit

Permalink
YARN-11514. Extend SchedulerResponse with capacityVector (#5989)
Browse files Browse the repository at this point in the history
Co-authored-by: Benjamin Teke <[email protected]>
  • Loading branch information
brumi1024 and bteke authored Sep 25, 2023
1 parent bf9975a commit f51162d
Show file tree
Hide file tree
Showing 49 changed files with 3,389 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,5 +281,9 @@ public double getResourceValue() {
public String getResourceName() {
return resourceName;
}

public String getResourceWithPostfix() {
return resourceValue + vectorResourceType.getPostfix();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo.getSortedQueueAclInfoList;

@XmlRootElement(name = "capacityScheduler")
Expand All @@ -47,6 +48,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
protected float maxCapacity;
protected float weight;
protected float normalizedWeight;
protected QueueCapacityVectorInfo queueCapacityVectorInfo;
protected String queueName;
private String queuePath;
protected int maxParallelApps;
Expand Down Expand Up @@ -78,6 +80,8 @@ public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs) {
this.queuePath = parent.getQueuePath();
this.usedCapacity = parent.getUsedCapacity() * 100;
this.capacity = parent.getCapacity() * 100;
this.queueCapacityVectorInfo = new QueueCapacityVectorInfo(
parent.getConfiguredCapacityVector(NO_LABEL));
float max = parent.getMaximumCapacity();
if (max < EPSILON || max > 1f)
max = 1f;
Expand All @@ -86,8 +90,7 @@ public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs) {
this.normalizedWeight = parent.getQueueCapacities().getNormalizedWeight();
this.maxParallelApps = parent.getMaxParallelApps();

capacities = new QueueCapacitiesInfo(parent.getQueueCapacities(),
parent.getQueueResourceQuotas(), false);
capacities = new QueueCapacitiesInfo(parent, false);
queues = getQueues(cs, parent);
health = new CapacitySchedulerHealthInfo(cs);
maximumAllocation = new ResourceInfo(parent.getMaximumAllocation());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@
import javax.xml.bind.annotation.XmlTransient;

import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.AutoCreatedLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UserInfo;

@XmlRootElement
Expand Down Expand Up @@ -105,9 +104,8 @@ protected void populateQueueResourceUsage(ResourceUsage queueResourceUsage) {
}

@Override
protected void populateQueueCapacities(QueueCapacities qCapacities,
QueueResourceQuotas qResQuotas) {
capacities = new QueueCapacitiesInfo(qCapacities, qResQuotas);
protected void populateQueueCapacities(CSQueue queue) {
capacities = new QueueCapacitiesInfo(queue, true);
}

public int getNumActiveApplications() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper.CapacitySchedulerInfoHelper;

@XmlRootElement
Expand Down Expand Up @@ -136,9 +134,7 @@ public class CapacitySchedulerQueueInfo {
nodeLabels.addAll(labelSet);
Collections.sort(nodeLabels);
}
QueueCapacities qCapacities = q.getQueueCapacities();
QueueResourceQuotas qResQuotas = q.getQueueResourceQuotas();
populateQueueCapacities(qCapacities, qResQuotas);
populateQueueCapacities(q);

mode = CapacitySchedulerInfoHelper.getMode(q);
queueType = CapacitySchedulerInfoHelper.getQueueType(q);
Expand Down Expand Up @@ -210,10 +206,8 @@ protected void populateQueueResourceUsage(ResourceUsage queueResourceUsage) {
resources = new ResourcesInfo(queueResourceUsage, false);
}

protected void populateQueueCapacities(QueueCapacities qCapacities,
QueueResourceQuotas qResQuotas) {
capacities = new QueueCapacitiesInfo(qCapacities, qResQuotas,
false);
protected void populateQueueCapacities(CSQueue queue) {
capacities = new QueueCapacitiesInfo(queue, false);
}

public float getCapacity() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
public class PartitionQueueCapacitiesInfo {
private String partitionName;

private QueueCapacityVectorInfo queueCapacityVectorInfo;
private float capacity;
private float usedCapacity;
private float maxCapacity = 100;
Expand All @@ -49,13 +50,15 @@ public class PartitionQueueCapacitiesInfo {
public PartitionQueueCapacitiesInfo() {
}

public PartitionQueueCapacitiesInfo(String partitionName, float capacity,
float usedCapacity, float maxCapacity, float absCapacity,
public PartitionQueueCapacitiesInfo(String partitionName,
QueueCapacityVectorInfo queueCapacityVectorInfo,
float capacity, float usedCapacity, float maxCapacity, float absCapacity,
float absUsedCapacity, float absMaxCapacity, float maxAMLimitPercentage,
float weight, float normalizedWeight,
Resource confMinRes, Resource confMaxRes, Resource effMinRes,
Resource effMaxRes) {
super();
this.queueCapacityVectorInfo = queueCapacityVectorInfo;
this.partitionName = partitionName;
this.capacity = capacity;
this.usedCapacity = usedCapacity;
Expand All @@ -72,6 +75,14 @@ public PartitionQueueCapacitiesInfo(String partitionName, float capacity,
this.effectiveMaxResource = new ResourceInfo(effMaxRes);
}

public QueueCapacityVectorInfo getQueueCapacityVectorInfo() {
return queueCapacityVectorInfo;
}

public void setQueueCapacityVectorInfo(QueueCapacityVectorInfo queueCapacityVectorInfo) {
this.queueCapacityVectorInfo = queueCapacityVectorInfo;
}

public float getCapacity() {
return capacity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.xml.bind.annotation.XmlRootElement;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;

/**
Expand All @@ -39,12 +40,13 @@ public class QueueCapacitiesInfo {
public QueueCapacitiesInfo() {
}

public QueueCapacitiesInfo(QueueCapacities capacities,
QueueResourceQuotas resourceQuotas,
boolean considerAMUsage) {
public QueueCapacitiesInfo(CSQueue queue, boolean considerAMUsage) {
QueueCapacities capacities = queue.getQueueCapacities();
QueueResourceQuotas resourceQuotas = queue.getQueueResourceQuotas();
if (capacities == null) {
return;
}
QueueCapacityVectorInfo queueCapacityVectorInfo;
float capacity;
float usedCapacity;
float maxCapacity;
Expand All @@ -55,6 +57,8 @@ public QueueCapacitiesInfo(QueueCapacities capacities,
float weight;
float normalizedWeight;
for (String partitionName : capacities.getExistingNodeLabels()) {
queueCapacityVectorInfo = new QueueCapacityVectorInfo(
queue.getConfiguredCapacityVector(partitionName));
usedCapacity = capacities.getUsedCapacity(partitionName) * 100;
capacity = capacities.getCapacity(partitionName) * 100;
maxCapacity = capacities.getMaximumCapacity(partitionName);
Expand All @@ -72,7 +76,7 @@ public QueueCapacitiesInfo(QueueCapacities capacities,
weight = capacities.getWeight(partitionName);
normalizedWeight = capacities.getNormalizedWeight(partitionName);
queueCapacitiesByPartition.add(new PartitionQueueCapacitiesInfo(
partitionName, capacity, usedCapacity, maxCapacity, absCapacity,
partitionName, queueCapacityVectorInfo, capacity, usedCapacity, maxCapacity, absCapacity,
absUsedCapacity, absMaxCapacity,
considerAMUsage ? maxAMLimitPercentage : 0f,
weight, normalizedWeight,
Expand All @@ -83,11 +87,6 @@ public QueueCapacitiesInfo(QueueCapacities capacities,
}
}

public QueueCapacitiesInfo(QueueCapacities capacities,
QueueResourceQuotas resourceQuotas) {
this(capacities, resourceQuotas, true);
}

public void add(PartitionQueueCapacitiesInfo partitionQueueCapacitiesInfo) {
queueCapacitiesByPartition.add(partitionQueueCapacitiesInfo);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;

@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class QueueCapacityVectorEntryInfo {
private String resourceName;
private String resourceValue;

public QueueCapacityVectorEntryInfo() {
}

public QueueCapacityVectorEntryInfo(String resourceName, String resourceValue) {
this.resourceName = resourceName;
this.resourceValue = resourceValue;
}

public String getResourceName() {
return this.resourceName;
}

public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}

public String getResourceValue() {
return this.resourceValue;
}

public void setResourceValue(String resourceValue) {
this.resourceValue = resourceValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;

import java.util.ArrayList;
import java.util.List;

@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class QueueCapacityVectorInfo {
private String configuredCapacityVector;
private List<QueueCapacityVectorEntryInfo> capacityVectorEntries;

public QueueCapacityVectorInfo() {
}

public QueueCapacityVectorInfo(QueueCapacityVector queueCapacityVector) {
this.configuredCapacityVector = queueCapacityVector.toString();
this.capacityVectorEntries = new ArrayList<>();
for (QueueCapacityVector.QueueCapacityVectorEntry
queueCapacityVectorEntry : queueCapacityVector) {
this.capacityVectorEntries.add(
new QueueCapacityVectorEntryInfo(queueCapacityVectorEntry.getResourceName(),
queueCapacityVectorEntry.getResourceWithPostfix()));
}
}

public String getConfiguredCapacityVector() {
return configuredCapacityVector;
}

public void setConfiguredCapacityVector(String configuredCapacityVector) {
this.configuredCapacityVector = configuredCapacityVector;
}

public List<QueueCapacityVectorEntryInfo> getCapacityVectorEntries() {
return capacityVectorEntries;
}

public void setCapacityVectorEntries(List<QueueCapacityVectorEntryInfo> capacityVectorEntries) {
this.capacityVectorEntries = capacityVectorEntries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ private void verifySchedulerInfoJson(JSONObject json)
JSONObject info = json.getJSONObject("scheduler");
assertEquals("incorrect number of elements", 1, info.length());
info = info.getJSONObject("schedulerInfo");
assertEquals("incorrect number of elements", 24, info.length());
assertEquals("incorrect number of elements", 25, info.length());
JSONObject capacitiesJsonObject = info.getJSONObject(CAPACITIES);
JSONArray partitionsCapsArray =
capacitiesJsonObject.getJSONArray(QUEUE_CAPACITIES_BY_PARTITION);
Expand Down
Loading

0 comments on commit f51162d

Please sign in to comment.