diff --git a/hadoop-tools/hadoop-distcp/pom.xml b/hadoop-tools/hadoop-distcp/pom.xml index cbdce3d76f5762..d9674804484e8e 100644 --- a/hadoop-tools/hadoop-distcp/pom.xml +++ b/hadoop-tools/hadoop-distcp/pom.xml @@ -114,6 +114,12 @@ assertj-core test + + org.hamcrest + hamcrest-library + 1.3 + test + diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java index 4d3676a66941a7..66cac955da8eb0 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java @@ -120,12 +120,11 @@ public long getTotalBytesRead() { * @return Read rate, in bytes/sec. */ public long getBytesPerSec() { - long elapsed = (System.currentTimeMillis() - startTime) / 1000; - if (elapsed == 0) { - return bytesRead; - } else { - return bytesRead / elapsed; + if (bytesRead == 0){ + return 0; } + float elapsed = (System.currentTimeMillis() - startTime) / 1000.0f; + return (long) (bytesRead / elapsed); } /** diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java index 6a572177192d94..295e0ab83139eb 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java @@ -22,6 +22,8 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.io.IOUtils; import org.junit.Assert; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertThat; import org.junit.Test; import java.io.*; @@ -67,6 +69,34 @@ public void testRead() { } } + @Test + public void testThrottleRead() { + File[] srcFiles = new File[100]; + File destFile; + try { + destFile = createFile(100 * 50 * 1024); + destFile.deleteOnExit(); + // create file + for (int i = 0; i < srcFiles.length; i++) { + srcFiles[i] = createFile(48 * 1024); + srcFiles[i].deleteOnExit(); + } + + // copy srcFiles + long begin = System.currentTimeMillis(); + LOG.info("begin: " + begin); + for (File srcFile : srcFiles) { + LOG.info("fileLength: " + srcFiles.length); + copyAndAssert(srcFile, destFile, 50, 1, 0, CB.BUFFER); + } + long end = System.currentTimeMillis(); + LOG.info("end: " + end); + assertThat((int) (end - begin) / 1000, greaterThanOrEqualTo(100)); + } catch (IOException e) { + LOG.error("Exception encountered ", e); + } + } + private long copyAndAssert(File tmpFile, File outFile, long maxBandwidth, float factor, int sleepTime, CB flag) throws IOException {