diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 7d7def2f1e930..68ec0f6f21f5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -195,6 +195,12 @@ public class Balancer { + "\tIncludes only the specified datanodes." + "\n\t[-source [-f | ]]" + "\tPick only the specified datanodes as source nodes." + + "\n\t[-excludeSource [-f | ]]" + + "\tExcludes the specified datanodes to be selected as a source." + + "\n\t[-target [-f | ]]" + + "\tPick only the specified datanodes as target nodes." + + "\n\t[-excludeTarget [-f | ]]" + + "\tExcludes the specified datanodes from being selected as a target." + "\n\t[-blockpools ]" + "\tThe balancer will only run on blockpools included in this list." + "\n\t[-idleiterations ]" @@ -224,6 +230,9 @@ public class Balancer { private final NameNodeConnector nnc; private final BalancingPolicy policy; private final Set sourceNodes; + private final Set excludedSourceNodes; + private final Set targetNodes; + private final Set excludedTargetNodes; private final boolean runDuringUpgrade; private final double threshold; private final long maxSizeToMove; @@ -353,6 +362,9 @@ static int getFailedTimesSinceLastSuccessfulBalance() { this.threshold = p.getThreshold(); this.policy = p.getBalancingPolicy(); this.sourceNodes = p.getSourceNodes(); + this.excludedSourceNodes = p.getExcludedSourceNodes(); + this.targetNodes = p.getTargetNodes(); + this.excludedTargetNodes = p.getExcludedTargetNodes(); this.runDuringUpgrade = p.getRunDuringUpgrade(); this.sortTopNodes = p.getSortTopNodes(); this.limitOverUtilizedNum = p.getLimitOverUtilizedNum(); @@ -411,7 +423,10 @@ private long init(List reports) { long overLoadedBytes = 0L, underLoadedBytes = 0L; for(DatanodeStorageReport r : reports) { final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); - final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo()); + final boolean isValidSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo()) && + !Util.isExcluded(excludedSourceNodes, dn.getDatanodeInfo()); + final boolean isValidTarget = Util.isIncluded(targetNodes, dn.getDatanodeInfo()) && + !Util.isExcluded(excludedTargetNodes, dn.getDatanodeInfo()); for(StorageType t : StorageType.getMovableTypes()) { final Double utilization = policy.getUtilization(r, t); if (utilization == null) { // datanode does not have such storage type @@ -419,10 +434,15 @@ private long init(List reports) { } final double average = policy.getAvgUtilization(t); - if (utilization >= average && !isSource) { - LOG.info(dn + "[" + t + "] has utilization=" + utilization - + " >= average=" + average - + " but it is not specified as a source; skipping it."); + if (utilization >= average && !isValidSource) { + LOG.info("{} [{}] utilization {} >= average {}, but it's either not specified" + + " or excluded as a source; skipping.", dn, t, utilization, average); + continue; + } + if (utilization <= average && !isValidTarget) { + LOG.info("{} [{}] utilization {} <= average {}, but it's either not specified" + + " or excluded as a target; skipping.", + dn, t, utilization, average); continue; } @@ -829,6 +849,9 @@ static private int doBalance(Collection namenodes, LOG.info("included nodes = " + p.getIncludedNodes()); LOG.info("excluded nodes = " + p.getExcludedNodes()); LOG.info("source nodes = " + p.getSourceNodes()); + LOG.info("excluded source nodes = " + p.getExcludedSourceNodes()); + LOG.info("target nodes = " + p.getTargetNodes()); + LOG.info("excluded target nodes = " + p.getExcludedTargetNodes()); checkKeytabAndInit(conf); System.out.println("Time Stamp Iteration#" + " Bytes Already Moved Bytes Left To Move Bytes Being Moved" @@ -1016,6 +1039,10 @@ public int run(String[] args) { static BalancerParameters parse(String[] args) { Set excludedNodes = null; Set includedNodes = null; + Set sourceNodes = null; + Set excludedSourceNodes = null; + Set targetNodes = null; + Set excludedTargetNodes = null; BalancerParameters.Builder b = new BalancerParameters.Builder(); if (args != null) { @@ -1056,9 +1083,21 @@ static BalancerParameters parse(String[] args) { i = processHostList(args, i, "include", includedNodes); b.setIncludedNodes(includedNodes); } else if ("-source".equalsIgnoreCase(args[i])) { - Set sourceNodes = new HashSet<>(); + sourceNodes = new HashSet<>(); i = processHostList(args, i, "source", sourceNodes); b.setSourceNodes(sourceNodes); + } else if ("-excludeSource".equalsIgnoreCase(args[i])) { + excludedSourceNodes = new HashSet<>(); + i = processHostList(args, i, "exclude source", excludedSourceNodes); + b.setExcludedSourceNodes(excludedSourceNodes); + } else if ("-target".equalsIgnoreCase(args[i])) { + targetNodes = new HashSet<>(); + i = processHostList(args, i, "target", targetNodes); + b.setTargetNodes(targetNodes); + } else if ("-excludeTarget".equalsIgnoreCase(args[i])) { + excludedTargetNodes = new HashSet<>(); + i = processHostList(args, i, "exclude target", excludedTargetNodes); + b.setExcludedTargetNodes(excludedTargetNodes); } else if ("-blockpools".equalsIgnoreCase(args[i])) { Preconditions.checkArgument( ++i < args.length, @@ -1111,6 +1150,10 @@ static BalancerParameters parse(String[] args) { } Preconditions.checkArgument(excludedNodes == null || includedNodes == null, "-exclude and -include options cannot be specified together."); + Preconditions.checkArgument(excludedSourceNodes == null || sourceNodes == null, + "-excludeSource and -source options cannot be specified together."); + Preconditions.checkArgument(excludedTargetNodes == null || targetNodes == null, + "-excludeTarget and -target options cannot be specified together."); } catch(RuntimeException e) { printUsage(System.err); throw e; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java index 7633be13015ae..35704a3464bba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java @@ -37,6 +37,21 @@ final class BalancerParameters { * source nodes. */ private final Set sourceNodes; + /** + * If empty, any node can be a source; otherwise, these nodes will be excluded as + * source nodes. + */ + private final Set excludedSourceNodes; + /** + * If empty, any node can be a target; otherwise, use only these nodes as + * target nodes. + */ + private final Set targetNodes; + /** + * If empty, any node can be a target; otherwise, these nodes will be excluded as + * target nodes. + */ + private final Set excludedTargetNodes; /** * A set of block pools to run the balancer on. */ @@ -65,6 +80,9 @@ private BalancerParameters(Builder builder) { this.excludedNodes = builder.excludedNodes; this.includedNodes = builder.includedNodes; this.sourceNodes = builder.sourceNodes; + this.excludedSourceNodes = builder.excludedSourceNodes; + this.targetNodes = builder.targetNodes; + this.excludedTargetNodes = builder.excludedTargetNodes; this.blockpools = builder.blockpools; this.runDuringUpgrade = builder.runDuringUpgrade; this.runAsService = builder.runAsService; @@ -97,6 +115,18 @@ Set getSourceNodes() { return this.sourceNodes; } + Set getExcludedSourceNodes() { + return this.excludedSourceNodes; + } + + Set getTargetNodes() { + return this.targetNodes; + } + + Set getExcludedTargetNodes() { + return this.excludedTargetNodes; + } + Set getBlockPools() { return this.blockpools; } @@ -126,12 +156,15 @@ public String toString() { return String.format("%s.%s [%s," + " threshold = %s," + " max idle iteration = %s," + " #excluded nodes = %s," + " #included nodes = %s," + " #source nodes = %s," + + " #excluded source nodes = %s," + " #target nodes = %s," + + " #excluded target nodes = %s," + " #blockpools = %s," + " run during upgrade = %s," + " sort top nodes = %s," + " limit overUtilized nodes num = %s," + " hot block time interval = %s]", Balancer.class.getSimpleName(), getClass().getSimpleName(), policy, threshold, maxIdleIteration, excludedNodes.size(), - includedNodes.size(), sourceNodes.size(), blockpools.size(), + includedNodes.size(), sourceNodes.size(), excludedSourceNodes.size(), targetNodes.size(), + excludedTargetNodes.size(), blockpools.size(), runDuringUpgrade, sortTopNodes, limitOverUtilizedNum, hotBlockTimeInterval); } @@ -144,6 +177,9 @@ static class Builder { private Set excludedNodes = Collections. emptySet(); private Set includedNodes = Collections. emptySet(); private Set sourceNodes = Collections. emptySet(); + private Set excludedSourceNodes = Collections. emptySet(); + private Set targetNodes = Collections. emptySet(); + private Set excludedTargetNodes = Collections. emptySet(); private Set blockpools = Collections. emptySet(); private boolean runDuringUpgrade = false; private boolean runAsService = false; @@ -189,6 +225,21 @@ Builder setSourceNodes(Set nodes) { return this; } + Builder setExcludedSourceNodes(Set nodes) { + this.excludedSourceNodes = nodes; + return this; + } + + Builder setTargetNodes(Set nodes) { + this.targetNodes = nodes; + return this; + } + + Builder setExcludedTargetNodes(Set nodes) { + this.excludedTargetNodes = nodes; + return this; + } + Builder setBlockpools(Set pools) { this.blockpools = pools; return this; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 32b1fa8a5e192..d5b9336af38eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -47,6 +47,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.doAnswer; @@ -473,7 +474,8 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity, static void waitForBalancer(long totalUsedSpace, long totalCapacity, ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p, int expectedExcludedNodes) throws IOException, TimeoutException { - waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes, true); + waitForBalancer(totalUsedSpace, totalCapacity, client, + cluster, p, expectedExcludedNodes, true); } /** @@ -495,6 +497,9 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity, if (!p.getExcludedNodes().isEmpty()) { totalCapacity -= p.getExcludedNodes().size() * CAPACITY; } + if (!p.getExcludedTargetNodes().isEmpty()) { + totalCapacity -= p.getExcludedTargetNodes().size() * CAPACITY; + } final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; boolean balanced; do { @@ -539,6 +544,69 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity, } while (!balanced); } + /** + * Wait until balanced: each datanode gives utilization within. + * Used when testing for included / excluded target and source nodes. + * BALANCE_ALLOWED_VARIANCE of average + * @throws IOException + * @throws TimeoutException + */ + static void waitForBalancer(long totalUsedSpace, long totalCapacity, + ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p, + int expectedExcludedSourceNodes, int expectedExcludedTargetNodes) + throws IOException, TimeoutException { + long timeout = TIMEOUT; + long failtime = (timeout <= 0L) ? Long.MAX_VALUE + : Time.monotonicNow() + timeout; + if (!p.getExcludedTargetNodes().isEmpty()) { + totalCapacity -= p.getExcludedTargetNodes().size() * CAPACITY; + } + final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; + boolean balanced; + do { + DatanodeInfo[] datanodeReport = + client.getDatanodeReport(DatanodeReportType.ALL); + assertEquals(datanodeReport.length, cluster.getDataNodes().size()); + balanced = true; + int actualExcludedSourceNodeCount = 0; + int actualExcludedTargetNodeCount = 0; + for (DatanodeInfo datanode : datanodeReport) { + double nodeUtilization = + ((double) datanode.getDfsUsed() + datanode.getNonDfsUsed()) / + datanode.getCapacity(); + if(Dispatcher.Util.isExcluded(p.getExcludedTargetNodes(), datanode)) { + actualExcludedTargetNodeCount++; + } + if(!Dispatcher.Util.isIncluded(p.getTargetNodes(), datanode)) { + actualExcludedTargetNodeCount++; + } + if(Dispatcher.Util.isExcluded(p.getExcludedSourceNodes(), datanode)) { + actualExcludedSourceNodeCount++; + } + if(!Dispatcher.Util.isIncluded(p.getSourceNodes(), datanode)) { + actualExcludedSourceNodeCount++; + } + if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) { + balanced = false; + if (Time.monotonicNow() > failtime) { + throw new TimeoutException( + "Rebalancing expected avg utilization to become " + + avgUtilization + ", but on datanode " + datanode + + " it remains at " + nodeUtilization + + " after more than " + TIMEOUT + " msec."); + } + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + } + break; + } + } + assertEquals(expectedExcludedSourceNodes, actualExcludedSourceNodeCount); + assertEquals(expectedExcludedTargetNodes, actualExcludedTargetNodeCount); + } while (!balanced); + } + String long2String(long[] array) { if (array.length == 0) { return ""; @@ -636,6 +704,14 @@ int getNumberofExcludeNodes() { } } + private void doTest(Configuration conf, + long newCapacity, String newRack, NewNodeInfo nodes, + boolean useTool, boolean useFile, BalancerParameters p) throws Exception { + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + newCapacity, 0L, newRack, nodes, + useTool, useFile, false, 0.3, p); + } + private void doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, boolean useTool) throws Exception { doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false); @@ -645,7 +721,7 @@ private void doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, boolean useTool, boolean useFile) throws Exception { doTest(conf, capacities, racks, newCapacity, 0L, newRack, nodes, - useTool, useFile, false, 0.3); + useTool, useFile, false, 0.3, null); } /** This test start a cluster with specified number of nodes, @@ -671,7 +747,7 @@ private void doTest(Configuration conf, long[] capacities, String[] racks, private void doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, long newNonDfsUsed, String newRack, NewNodeInfo nodes, boolean useTool, boolean useFile, - boolean useNamesystemSpy, double clusterUtilization) throws Exception { + boolean useNamesystemSpy, double clusterUtilization, BalancerParameters p) throws Exception { LOG.info("capacities = " + long2String(capacities)); LOG.info("racks = " + Arrays.asList(racks)); LOG.info("newCapacity= " + newCapacity); @@ -746,7 +822,7 @@ private void doTest(Configuration conf, long[] capacities, totalNodes-1-i).getDatanodeId().getXferAddr()); } } - //polulate the exclude nodes + //populate the exclude nodes if (nodes.getNumberofExcludeNodes() > 0) { int totalNodes = cluster.getDataNodes().size(); for (int i=0; i < nodes.getNumberofExcludeNodes(); i++) { @@ -756,16 +832,16 @@ private void doTest(Configuration conf, long[] capacities, } } } - // run balancer and validate results - BalancerParameters.Builder pBuilder = - new BalancerParameters.Builder(); - if (nodes != null) { - pBuilder.setExcludedNodes(nodes.getNodesToBeExcluded()); - pBuilder.setIncludedNodes(nodes.getNodesToBeIncluded()); - pBuilder.setRunDuringUpgrade(false); + if(p == null) { + // run balancer and validate results + BalancerParameters.Builder pBuilder = new BalancerParameters.Builder(); + if (nodes != null) { + pBuilder.setExcludedNodes(nodes.getNodesToBeExcluded()); + pBuilder.setIncludedNodes(nodes.getNodesToBeIncluded()); + pBuilder.setRunDuringUpgrade(false); + } + p = pBuilder.build(); } - BalancerParameters p = pBuilder.build(); - int expectedExcludedNodes = 0; if (nodes != null) { if (!nodes.getNodesToBeExcluded().isEmpty()) { @@ -821,6 +897,9 @@ private void runBalancer(Configuration conf, long totalDfsUsedSpace, == 0) { assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), run); return; + } else if(run == ExitStatus.NO_MOVE_BLOCK.getExitCode()) { + LOG.error("Exit status returned: " + run); + throw new Exception(String.valueOf(ExitStatus.NO_MOVE_BLOCK.getExitCode())); } else { assertEquals(ExitStatus.SUCCESS.getExitCode(), run); } @@ -828,8 +907,22 @@ private void runBalancer(Configuration conf, long totalDfsUsedSpace, LOG.info(" ."); try { long totalUsedSpace = totalDfsUsedSpace + totalNonDfsUsedSpace; - waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, - excludedNodes, checkExcludeNodesUtilization); + int expectedExcludedSourceNodes = 0; + int expectedExcludedTargetNodes = 0; + if(!p.getExcludedSourceNodes().isEmpty()) { + expectedExcludedSourceNodes = p.getExcludedSourceNodes().size(); + } + if(!p.getExcludedTargetNodes().isEmpty()) { + expectedExcludedTargetNodes = p.getExcludedTargetNodes().size(); + } + if(expectedExcludedSourceNodes > 0 || expectedExcludedTargetNodes > 0) { + waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, + expectedExcludedSourceNodes, expectedExcludedTargetNodes); + } else { + waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, + excludedNodes, + checkExcludeNodesUtilization); + } } catch (TimeoutException e) { // See HDFS-11682. NN may not get heartbeat to reflect the newest // block changes. @@ -879,8 +972,9 @@ private static int runBalancer(Collection namenodes, b.resetData(conf); if (r.getExitStatus() == ExitStatus.IN_PROGRESS) { done = false; - } else if (r.getExitStatus() != ExitStatus.SUCCESS) { - //must be an error statue, return. + } else if (r.getExitStatus() != ExitStatus.SUCCESS + || r.getExitStatus() != ExitStatus.NO_MOVE_BLOCK) { + //must be an error status, return. return r.getExitStatus().getExitCode(); } else { if (iteration > 0) { @@ -1129,7 +1223,7 @@ public void testBalancer3() throws Exception { Configuration conf = new HdfsConfiguration(); initConf(conf); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, - CAPACITY, 1000L, RACK2, null, false, false, false, 0.3); + CAPACITY, 1000L, RACK2, null, false, false, false, 0.3, null); } private void testBalancerDefaultConstructor(Configuration conf, @@ -1245,8 +1339,49 @@ public void testBalancerCliParseWithWrongParams() { Balancer.Cli.parse(parameters); fail(reason + " for -source parameter"); } catch (IllegalArgumentException ignored) { - // expected + + } + + parameters = new String[] {"-excludeSource"}; + try { + Balancer.Cli.parse(parameters); + fail(reason + " for -excludeSource parameter"); + } catch (IllegalArgumentException ignored) { + + } + + parameters = new String[] {"-source", "testnode1", "-excludeSource", "testnode2"}; + try { + Balancer.Cli.parse(parameters); + fail("Exception is expected when both -source and -excludeSource are specified"); + } catch (IllegalArgumentException e) { + + } + + parameters = new String[] {"-target"}; + try { + Balancer.Cli.parse(parameters); + fail(reason + " for -target parameter"); + } catch (IllegalArgumentException ignored) { + } + + parameters = new String[] {"-excludeTarget"}; + try { + Balancer.Cli.parse(parameters); + fail(reason + " for -excludeTarget parameter"); + } catch (IllegalArgumentException ignored) { + + } + + parameters = new String[] {"-target", "testnode1", "-excludeTarget", "testnode2"}; + try { + Balancer.Cli.parse(parameters); + fail("Exception expected when both -target and -excludeTarget are specified"); + } catch (IllegalArgumentException e) { + + } + } @Test @@ -1920,7 +2055,7 @@ void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception { // all get block calls, so if two iterations are performed, the duration // also includes the time it took to perform the block move ops in the // first iteration - new PortNumberBasedNodes(1, 0, 0), false, false, true, 0.5); + new PortNumberBasedNodes(1, 0, 0), false, false, true, 0.5, null); assertTrue("Number of getBlocks should be not less than " + getBlocksMaxQps, numGetBlocksCalls.get() >= getBlocksMaxQps); long durationMs = 1 + endGetBlocksTime.get() - startGetBlocksTime.get(); @@ -1932,6 +2067,132 @@ void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception { getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps); } + /** + * Test balancer with excluded target nodes. + * One of three added nodes is excluded in the target nodes list. + * Balancer should only move blocks to the two included nodes. + */ + @Test(timeout=100000) + public void testBalancerExcludeTargetNodesNoMoveBlock() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set excludeTargetNodes = new HashSet<>(); + excludeTargetNodes.add("datanodeZ"); + BalancerParameters.Builder pBuilder = new BalancerParameters.Builder(); + pBuilder.setExcludedTargetNodes(excludeTargetNodes); + BalancerParameters p = pBuilder.build(); + Exception exception = assertThrows(Exception.class, () -> { + doTest(conf, CAPACITY, RACK2, + new HostNameBasedNodes(new String[]{"datanodeX", "datanodeY", "datanodeZ"}, + BalancerParameters.DEFAULT.getExcludedNodes(), + BalancerParameters.DEFAULT.getIncludedNodes()), + false, false, p); + }); + + assertTrue(exception.getMessage() + .contains(String.valueOf(ExitStatus.NO_MOVE_BLOCK.getExitCode()))); + } + + /** + * Test balancer with included target nodes. + * Two of three added nodes are included in the target nodes list. + * Balancer should only move blocks to the included nodes. + */ + @Test(timeout=100000) + public void testBalancerIncludeTargetNodesNoMoveBlock() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set includeTargetNodes = new HashSet<>(); + includeTargetNodes.add("datanodeY"); + includeTargetNodes.add("datanodeZ"); + BalancerParameters.Builder pBuilder = new BalancerParameters.Builder(); + pBuilder.setTargetNodes(includeTargetNodes); + BalancerParameters p = pBuilder.build(); + Exception exception = assertThrows(Exception.class, () -> { + doTest(conf, CAPACITY, RACK2, + new HostNameBasedNodes(new String[]{"datanodeX", "datanodeY", "datanodeZ"}, + BalancerParameters.DEFAULT.getExcludedNodes(), + BalancerParameters.DEFAULT.getIncludedNodes()), + false, false, p); + }); + + assertTrue(exception.getMessage() + .contains(String.valueOf(ExitStatus.NO_MOVE_BLOCK.getExitCode()))); + } + + /** + * Test balancer with included target nodes. + * Three of three added nodes are included in the target nodes list. + * Balancer should exit with success code. + */ + @Test(timeout=100000) + public void testBalancerIncludeTargetNodesSuccess() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set includeTargetNodes = new HashSet<>(); + includeTargetNodes.add("datanodeX"); + includeTargetNodes.add("datanodeY"); + includeTargetNodes.add("datanodeZ"); + BalancerParameters.Builder pBuilder = new BalancerParameters.Builder(); + pBuilder.setTargetNodes(includeTargetNodes); + BalancerParameters p = pBuilder.build(); + doTest(conf, CAPACITY, RACK2, + new HostNameBasedNodes(new String[]{"datanodeX", "datanodeY", "datanodeZ"}, + BalancerParameters.DEFAULT.getExcludedNodes(), + BalancerParameters.DEFAULT.getIncludedNodes()), + false, false, p); + } + + /** + * Test balancer with included source nodes. + * Since newly added nodes are the only included source nodes no balancing will occur. + */ + @Test(timeout=100000) + public void testBalancerIncludeSourceNodesNoMoveBlock() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set includeSourceNodes = new HashSet<>(); + includeSourceNodes.add("datanodeX"); + includeSourceNodes.add("datanodeY"); + includeSourceNodes.add("datanodeZ"); + BalancerParameters.Builder pBuilder = new BalancerParameters.Builder(); + pBuilder.setSourceNodes(includeSourceNodes); + BalancerParameters p = pBuilder.build(); + Exception exception = assertThrows(Exception.class, () -> { + doTest(conf, CAPACITY, RACK2, + new HostNameBasedNodes(new String[]{"datanodeX", "datanodeY", "datanodeZ"}, + BalancerParameters.DEFAULT.getExcludedNodes(), + BalancerParameters.DEFAULT.getIncludedNodes()), + false, false, p); + }); + + assertTrue(exception.getMessage() + .contains(String.valueOf(ExitStatus.NO_MOVE_BLOCK.getExitCode()))); + } + + /** + * Test balancer with excluded source nodes. + * Since newly added nodes will not be selected as a source, + * all nodes will be included in balancing. + */ + @Test(timeout=100000) + public void testBalancerExcludeSourceNodes() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set excludeSourceNodes = new HashSet<>(); + excludeSourceNodes.add("datanodeX"); + excludeSourceNodes.add("datanodeY"); + BalancerParameters.Builder pBuilder = new BalancerParameters.Builder(); + pBuilder.setExcludedSourceNodes(excludeSourceNodes); + BalancerParameters p = pBuilder.build(); + doTest(conf, CAPACITY, RACK2, + new HostNameBasedNodes(new String[]{"datanodeX", "datanodeY", "datanodeZ"}, + BalancerParameters.DEFAULT.getExcludedNodes(), + BalancerParameters.DEFAULT.getIncludedNodes()), false, + false, p); + } + + /** * @param args */