Skip to content

Commit

Permalink
HDFS-17642. Add target node list, exclude source node list, and exclu…
Browse files Browse the repository at this point in the history
…de target node list parameters to balancer (#7127)

HDFS-17642. Add target node list, exclude source node list, and exclude target node list parameters to balancer (#7127)

---------

Co-authored-by: Joseph DellAringa <[email protected]>
  • Loading branch information
Jtdellaringa and Joseph DellAringa authored Nov 7, 2024
1 parent 487727a commit 9657276
Show file tree
Hide file tree
Showing 3 changed files with 382 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ public class Balancer {
+ "\tIncludes only the specified datanodes."
+ "\n\t[-source [-f <hosts-file> | <comma-separated list of hosts>]]"
+ "\tPick only the specified datanodes as source nodes."
+ "\n\t[-excludeSource [-f <hosts-file> | <comma-separated list of hosts>]]"
+ "\tExcludes the specified datanodes to be selected as a source."
+ "\n\t[-target [-f <hosts-file> | <comma-separated list of hosts>]]"
+ "\tPick only the specified datanodes as target nodes."
+ "\n\t[-excludeTarget [-f <hosts-file> | <comma-separated list of hosts>]]"
+ "\tExcludes the specified datanodes from being selected as a target."
+ "\n\t[-blockpools <comma-separated list of blockpool ids>]"
+ "\tThe balancer will only run on blockpools included in this list."
+ "\n\t[-idleiterations <idleiterations>]"
Expand Down Expand Up @@ -224,6 +230,9 @@ public class Balancer {
private final NameNodeConnector nnc;
private final BalancingPolicy policy;
private final Set<String> sourceNodes;
private final Set<String> excludedSourceNodes;
private final Set<String> targetNodes;
private final Set<String> excludedTargetNodes;
private final boolean runDuringUpgrade;
private final double threshold;
private final long maxSizeToMove;
Expand Down Expand Up @@ -353,6 +362,9 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
this.threshold = p.getThreshold();
this.policy = p.getBalancingPolicy();
this.sourceNodes = p.getSourceNodes();
this.excludedSourceNodes = p.getExcludedSourceNodes();
this.targetNodes = p.getTargetNodes();
this.excludedTargetNodes = p.getExcludedTargetNodes();
this.runDuringUpgrade = p.getRunDuringUpgrade();
this.sortTopNodes = p.getSortTopNodes();
this.limitOverUtilizedNum = p.getLimitOverUtilizedNum();
Expand Down Expand Up @@ -411,18 +423,26 @@ private long init(List<DatanodeStorageReport> reports) {
long overLoadedBytes = 0L, underLoadedBytes = 0L;
for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
final boolean isValidSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo()) &&
!Util.isExcluded(excludedSourceNodes, dn.getDatanodeInfo());
final boolean isValidTarget = Util.isIncluded(targetNodes, dn.getDatanodeInfo()) &&
!Util.isExcluded(excludedTargetNodes, dn.getDatanodeInfo());
for(StorageType t : StorageType.getMovableTypes()) {
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type
continue;
}

final double average = policy.getAvgUtilization(t);
if (utilization >= average && !isSource) {
LOG.info(dn + "[" + t + "] has utilization=" + utilization
+ " >= average=" + average
+ " but it is not specified as a source; skipping it.");
if (utilization >= average && !isValidSource) {
LOG.info("{} [{}] utilization {} >= average {}, but it's either not specified"
+ " or excluded as a source; skipping.", dn, t, utilization, average);
continue;
}
if (utilization <= average && !isValidTarget) {
LOG.info("{} [{}] utilization {} <= average {}, but it's either not specified"
+ " or excluded as a target; skipping.",
dn, t, utilization, average);
continue;
}

Expand Down Expand Up @@ -829,6 +849,9 @@ static private int doBalance(Collection<URI> namenodes,
LOG.info("included nodes = " + p.getIncludedNodes());
LOG.info("excluded nodes = " + p.getExcludedNodes());
LOG.info("source nodes = " + p.getSourceNodes());
LOG.info("excluded source nodes = " + p.getExcludedSourceNodes());
LOG.info("target nodes = " + p.getTargetNodes());
LOG.info("excluded target nodes = " + p.getExcludedTargetNodes());
checkKeytabAndInit(conf);
System.out.println("Time Stamp Iteration#"
+ " Bytes Already Moved Bytes Left To Move Bytes Being Moved"
Expand Down Expand Up @@ -1016,6 +1039,10 @@ public int run(String[] args) {
static BalancerParameters parse(String[] args) {
Set<String> excludedNodes = null;
Set<String> includedNodes = null;
Set<String> sourceNodes = null;
Set<String> excludedSourceNodes = null;
Set<String> targetNodes = null;
Set<String> excludedTargetNodes = null;
BalancerParameters.Builder b = new BalancerParameters.Builder();

if (args != null) {
Expand Down Expand Up @@ -1056,9 +1083,21 @@ static BalancerParameters parse(String[] args) {
i = processHostList(args, i, "include", includedNodes);
b.setIncludedNodes(includedNodes);
} else if ("-source".equalsIgnoreCase(args[i])) {
Set<String> sourceNodes = new HashSet<>();
sourceNodes = new HashSet<>();
i = processHostList(args, i, "source", sourceNodes);
b.setSourceNodes(sourceNodes);
} else if ("-excludeSource".equalsIgnoreCase(args[i])) {
excludedSourceNodes = new HashSet<>();
i = processHostList(args, i, "exclude source", excludedSourceNodes);
b.setExcludedSourceNodes(excludedSourceNodes);
} else if ("-target".equalsIgnoreCase(args[i])) {
targetNodes = new HashSet<>();
i = processHostList(args, i, "target", targetNodes);
b.setTargetNodes(targetNodes);
} else if ("-excludeTarget".equalsIgnoreCase(args[i])) {
excludedTargetNodes = new HashSet<>();
i = processHostList(args, i, "exclude target", excludedTargetNodes);
b.setExcludedTargetNodes(excludedTargetNodes);
} else if ("-blockpools".equalsIgnoreCase(args[i])) {
Preconditions.checkArgument(
++i < args.length,
Expand Down Expand Up @@ -1111,6 +1150,10 @@ static BalancerParameters parse(String[] args) {
}
Preconditions.checkArgument(excludedNodes == null || includedNodes == null,
"-exclude and -include options cannot be specified together.");
Preconditions.checkArgument(excludedSourceNodes == null || sourceNodes == null,
"-excludeSource and -source options cannot be specified together.");
Preconditions.checkArgument(excludedTargetNodes == null || targetNodes == null,
"-excludeTarget and -target options cannot be specified together.");
} catch(RuntimeException e) {
printUsage(System.err);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ final class BalancerParameters {
* source nodes.
*/
private final Set<String> sourceNodes;
/**
* If empty, any node can be a source; otherwise, these nodes will be excluded as
* source nodes.
*/
private final Set<String> excludedSourceNodes;
/**
* If empty, any node can be a target; otherwise, use only these nodes as
* target nodes.
*/
private final Set<String> targetNodes;
/**
* If empty, any node can be a target; otherwise, these nodes will be excluded as
* target nodes.
*/
private final Set<String> excludedTargetNodes;
/**
* A set of block pools to run the balancer on.
*/
Expand Down Expand Up @@ -65,6 +80,9 @@ private BalancerParameters(Builder builder) {
this.excludedNodes = builder.excludedNodes;
this.includedNodes = builder.includedNodes;
this.sourceNodes = builder.sourceNodes;
this.excludedSourceNodes = builder.excludedSourceNodes;
this.targetNodes = builder.targetNodes;
this.excludedTargetNodes = builder.excludedTargetNodes;
this.blockpools = builder.blockpools;
this.runDuringUpgrade = builder.runDuringUpgrade;
this.runAsService = builder.runAsService;
Expand Down Expand Up @@ -97,6 +115,18 @@ Set<String> getSourceNodes() {
return this.sourceNodes;
}

Set<String> getExcludedSourceNodes() {
return this.excludedSourceNodes;
}

Set<String> getTargetNodes() {
return this.targetNodes;
}

Set<String> getExcludedTargetNodes() {
return this.excludedTargetNodes;
}

Set<String> getBlockPools() {
return this.blockpools;
}
Expand Down Expand Up @@ -126,12 +156,15 @@ public String toString() {
return String.format("%s.%s [%s," + " threshold = %s,"
+ " max idle iteration = %s," + " #excluded nodes = %s,"
+ " #included nodes = %s," + " #source nodes = %s,"
+ " #excluded source nodes = %s," + " #target nodes = %s,"
+ " #excluded target nodes = %s,"
+ " #blockpools = %s," + " run during upgrade = %s,"
+ " sort top nodes = %s," + " limit overUtilized nodes num = %s,"
+ " hot block time interval = %s]",
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
threshold, maxIdleIteration, excludedNodes.size(),
includedNodes.size(), sourceNodes.size(), blockpools.size(),
includedNodes.size(), sourceNodes.size(), excludedSourceNodes.size(), targetNodes.size(),
excludedTargetNodes.size(), blockpools.size(),
runDuringUpgrade, sortTopNodes, limitOverUtilizedNum, hotBlockTimeInterval);
}

Expand All @@ -144,6 +177,9 @@ static class Builder {
private Set<String> excludedNodes = Collections.<String> emptySet();
private Set<String> includedNodes = Collections.<String> emptySet();
private Set<String> sourceNodes = Collections.<String> emptySet();
private Set<String> excludedSourceNodes = Collections.<String> emptySet();
private Set<String> targetNodes = Collections.<String> emptySet();
private Set<String> excludedTargetNodes = Collections.<String> emptySet();
private Set<String> blockpools = Collections.<String> emptySet();
private boolean runDuringUpgrade = false;
private boolean runAsService = false;
Expand Down Expand Up @@ -189,6 +225,21 @@ Builder setSourceNodes(Set<String> nodes) {
return this;
}

Builder setExcludedSourceNodes(Set<String> nodes) {
this.excludedSourceNodes = nodes;
return this;
}

Builder setTargetNodes(Set<String> nodes) {
this.targetNodes = nodes;
return this;
}

Builder setExcludedTargetNodes(Set<String> nodes) {
this.excludedTargetNodes = nodes;
return this;
}

Builder setBlockpools(Set<String> pools) {
this.blockpools = pools;
return this;
Expand Down
Loading

0 comments on commit 9657276

Please sign in to comment.