Skip to content

Commit

Permalink
[HOPS-995] Datanode doing full block report when starting a new namen…
Browse files Browse the repository at this point in the history
…ode in multi-namenode setup
  • Loading branch information
smkniazi authored and Gautier Berthou committed Jan 11, 2019
1 parent 69d919b commit b5d7738
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import static org.apache.hadoop.hdfs.server.datanode.BPServiceActor.LOG;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReport;

Expand Down Expand Up @@ -553,6 +552,22 @@ void scheduleBlockReport(long delay) {
scheduleBlockReportInt(delay);
}

public boolean otherActorsConnectedToNNs(BPServiceActor skip){
try{
readLock();
for (BPServiceActor actor : bpServices) {
if(actor != skip){
if (actor.connectedToNN()) {
return true;
}
}
}
return false;
}finally {
readUnlock();
}
}

/**
* Ask each of the actors to report a bad block hosted on another DN.
*/
Expand Down Expand Up @@ -1135,6 +1150,7 @@ List<DatanodeCommand> blockReport() throws IOException {
long brSendCost = now() - brSendStartTime;
long brCreateCost = brSendStartTime - brCreateStartTime;
dn.getMetrics().addBlockReport(brSendCost);
dn.getMetrics().incrBlocReportCounter(numReportsSent);
LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount +
" blocks total. Took " + brCreateCost +
" msec to generate and " + brSendCost +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.hops.leader_election.node.SortedActiveNodeList;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
Expand All @@ -46,6 +45,7 @@
import org.apache.hadoop.util.VersionUtil;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.Collection;
Expand Down Expand Up @@ -86,6 +86,8 @@ static enum RunningState {

private final Object waitForHeartBeats = new Object();

private boolean connectedToNN = false;

BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
this.bpos = bpos;
this.dn = bpos.getDataNode();
Expand Down Expand Up @@ -396,6 +398,8 @@ private void offerService() throws Exception {
waitForHeartBeats.wait(waitTime);
}

// no exceptions so
connectedToNN = true;
} catch (RemoteException re) {
String reClass = re.getClassName();
if (UnregisteredNodeException.class.getName().equals(reClass) ||
Expand All @@ -416,6 +420,8 @@ private void offerService() throws Exception {
LOG.warn("OfferService interrupted", e);
} catch (IOException e) {
LOG.warn("IOException in offerService", e);
//not connected to namenode
connectedToNN = false;
}
} // while (shouldRun())
} // offerService
Expand Down Expand Up @@ -452,7 +458,13 @@ void register() throws IOException {
bpos.registrationSucceeded(this, bpRegistration);

// random short delay - helps scatter the BR from all DNs
bpos.scheduleBlockReport(dnConf.initialBlockReportDelay);
// block report only if the datanode is not already connected
// to any other namenode.
if(!bpos.otherActorsConnectedToNNs(this)) {
bpos.scheduleBlockReport(dnConf.initialBlockReportDelay);
} else {
LOG.info("Block Report skipped as other BPServiceActors are connected to the namenodes ");
}
}


Expand Down Expand Up @@ -643,4 +655,8 @@ public byte[] getSmallFileDataFromNN(int id)throws IOException {
return null;
}
}

public boolean connectedToNN(){
return connectedToNN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public class DataNodeMetrics {
MutableRate heartbeats;
@Metric
MutableRate blockReports;
@Metric
MutableCounterLong blockReportCount;
@Metric MutableRate cacheReports;
@Metric
MutableRate packetAckRoundTripTimeNanos;
Expand Down Expand Up @@ -190,6 +192,11 @@ public void addBlockReport(long latency) {
blockReports.add(latency);
}

public void incrBlocReportCounter(int increment) {
blockReportCount.incr(increment);

}

public void addCacheReport(long latency) {
cacheReports.add(latency);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright (C) 2015 hops.io.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;

public class TestHABlockReports extends junit.framework.TestCase {

public static final Log LOG = LogFactory.getLog(TestHABlockReports.class);

Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
int NUM_NAMENODES = 2;
int NUM_DATANODES = 3;


public void setconfig() {
conf.setInt(DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, /*default 15*/ 1);
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY, /*default 10*/ 1);
conf.setInt(DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY, /*default 500*/ 500);
conf.setInt(DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY, /*default 15000*/1000);
conf.setInt(DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_KEY, /*default 0*/ 0);
conf.setInt(DFSConfigKeys.DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
/*default 0*/0);
conf.setInt(DFSConfigKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, /*default
45*/ 2);
conf.setInt(DFSConfigKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, /*default 10*/ 1);
conf.set(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY, "1000,2");
}

/**
* Testing block reporting in a multi-namenode setup. When a new
* namenode is started then the datanodes connect to the new namenode.
* However, there is no need to send complete block report to the new
* namenode because the existing namenodes have already processed the block
* reports from the datanodes.
*/
@Test(timeout = 900000)
public void testMultiNNBlockReports() throws IOException, TimeoutException, InterruptedException {

final int NN1 = 0, NN2 = 1;
if (NUM_NAMENODES < 2) {
NUM_NAMENODES = 2;
}

try {
setconfig();
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHOPSTopology(NUM_NAMENODES))
.numDataNodes(NUM_DATANODES).build();
cluster.waitActive();

Thread.sleep(10000); // make sure that all block reports are in

assertEquals("Wrong number of block reports", 3, getTotalBRSent(cluster));

LOG.info("Cluster started. Total Block reports: " + getTotalBRSent(cluster));

//kill one namenode
cluster.shutdownNameNode(NN2); // now the cluster is running with 1 NN
LOG.info("Second namenode is killed");

// create some files and change the state of the datanodes
DistributedFileSystem fs = cluster.getFileSystem(0);
for (int i = 0; i < 5; i++) {
FSDataOutputStream out = fs.create(new Path("/test" + i));
out.write(i);
out.close();
}

LOG.info("Created some files to change the state of the datanodes");

assertEquals("Wrong number of block reports", 3, getTotalBRSent(cluster));

cluster.restartNameNode(NN2); // now the cluster is running with 1 NN
cluster.waitActive();

Thread.sleep(10000);

LOG.info("Restarted the second namenode. Total Block reports: " + getTotalBRSent(cluster));
assertEquals("Wrong number of block reports", 3, getTotalBRSent(cluster));

} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}

//
@Test(timeout = 900000)
public void testAllDeadNamenodes() throws IOException, TimeoutException, InterruptedException {
final int NN1 = 0, NN2 = 1;
if (NUM_NAMENODES < 2) {
NUM_NAMENODES = 2;
}

try {
setconfig();
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHOPSTopology(NUM_NAMENODES))
.numDataNodes(NUM_DATANODES).build();
cluster.waitActive();

Thread.sleep(10000); // make sure that all block reports are in

assertEquals("Wrong number of block reports", 3, getTotalBRSent(cluster));
LOG.info("Cluster started. Total Block reports: " + getTotalBRSent(cluster));

cluster.shutdownNameNode(NN1);
cluster.shutdownNameNode(NN2);

Thread.sleep(10000); // make sure that all block reports are in

cluster.restartNameNode(NN1, false);
cluster.restartNameNode(NN2, false);
cluster.getNameNode(0).metrics.incrAddBlockOps();
cluster.waitActive();

Thread.sleep(10000); // make sure that all block reports are in

// 3 more block reports are received from the 3 datanodes
assertEquals("Wrong number of block reports", 6, getTotalBRSent(cluster));
LOG.info("Cluster started. Total Block reports: " + getTotalBRSent(cluster));


} finally {
if (cluster != null) {
cluster.shutdown();
}
}

}

private long getTotalBRSent(MiniDFSCluster cluster) {
long counter = 0;
List<DataNode> datanodes = cluster.getDataNodes();
for (DataNode dn : datanodes) {
MetricsRecordBuilder rb = MetricsAsserts.getMetrics(dn.getMetrics().name());
counter += MetricsAsserts.getLongCounter("BlockReportCount", rb);
}
return counter;
}
}

0 comments on commit b5d7738

Please sign in to comment.