Skip to content

Commit

Permalink
HDDS-11627. Support getBlock operation on short-circuit channel.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenSammi committed Nov 14, 2024
1 parent 15ee286 commit e04c83b
Show file tree
Hide file tree
Showing 49 changed files with 3,370 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.DomainSocketFactory;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
Expand All @@ -41,6 +43,8 @@ public static void enableErrorInjection(ErrorInjector injector) {
private final boolean topologyAwareRead;
private final ClientTrustManager trustManager;
private final boolean securityEnabled;
private boolean shortCircuitEnabled;
private DomainSocketFactory domainSocketFactory;

public XceiverClientCreator(ConfigurationSource conf) {
this(conf, null);
Expand All @@ -56,13 +60,25 @@ public XceiverClientCreator(ConfigurationSource conf, ClientTrustManager trustMa
if (securityEnabled) {
Preconditions.checkNotNull(trustManager);
}
shortCircuitEnabled = conf.getObject(OzoneClientConfig.class).isShortCircuitEnabled();
if (shortCircuitEnabled) {
domainSocketFactory = DomainSocketFactory.getInstance(conf);
}
}

public boolean isSecurityEnabled() {
return securityEnabled;
}

public boolean isShortCircuitEnabled() {
return shortCircuitEnabled && domainSocketFactory.isServiceReady();
}

protected XceiverClientSpi newClient(Pipeline pipeline) throws IOException {
return newClient(pipeline, null);
}

protected XceiverClientSpi newClient(Pipeline pipeline, DatanodeDetails dn) throws IOException {
XceiverClientSpi client;
switch (pipeline.getType()) {
case RATIS:
Expand All @@ -74,6 +90,9 @@ protected XceiverClientSpi newClient(Pipeline pipeline) throws IOException {
case EC:
client = new ECXceiverClientGrpc(pipeline, conf, trustManager);
break;
case SHORT_CIRCUIT:
client = new XceiverClientShortCircuit(pipeline, conf, dn);
break;
case CHAINED:
default:
throw new IOException("not implemented " + pipeline.getType());
Expand All @@ -97,7 +116,14 @@ public void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClie
}

@Override
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline) throws IOException {
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline, boolean allowShortCircuit)
throws IOException {
return acquireClient(pipeline);
}

@Override
public XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware, boolean allowShortCircuit)
throws IOException {
return acquireClient(pipeline);
}

Expand All @@ -117,7 +143,10 @@ public void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClie
}

@Override
public void close() throws Exception {
// clients are not tracked, closing each client is the responsibility of users of this class
public void close() {
// clients are not tracked, closing each client is the responsibility of users of this classclass
if (domainSocketFactory != null) {
domainSocketFactory.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ public interface XceiverClientFactory extends AutoCloseable {
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
throws IOException;
default XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
throws IOException {
return acquireClientForReadData(pipeline, false);
}

/**
* Releases a read XceiverClientSpi after use.
Expand All @@ -73,10 +75,20 @@ void releaseClientForReadData(XceiverClientSpi client,
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware)
default XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware) throws IOException {
return acquireClient(pipeline, topologyAware, false);
}

XceiverClientSpi acquireClientForReadData(Pipeline pipeline, boolean allowShortCircuit)
throws IOException;

XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware, boolean allowShortCircuit)
throws IOException;

void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient,
boolean topologyAware);

default boolean isShortCircuitEnabled() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
* how it works, and how it is integrated with the Ozone client.
*/
public class XceiverClientGrpc extends XceiverClientSpi {
private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(XceiverClientGrpc.class);
private final Pipeline pipeline;
private final ConfigurationSource config;
Expand Down Expand Up @@ -133,6 +133,7 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
this.trustManager = trustManager;
this.getBlockDNcache = new ConcurrentHashMap<>();
LOG.info("{} is created for pipeline {}", XceiverClientGrpc.class.getSimpleName(), pipeline);
}

/**
Expand Down Expand Up @@ -246,6 +247,10 @@ public synchronized void close() {
}
}

public boolean isClosed() {
return closed;
}

@Override
public Pipeline getPipeline() {
return pipeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
package org.apache.hadoop.hdds.scm;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
Expand All @@ -29,6 +34,9 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.DomainSocketFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.util.OzoneNetUtils;
import org.apache.hadoop.security.UserGroupInformation;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -38,6 +46,7 @@
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hdds.DatanodeVersion.SHORT_CIRCUIT_READS;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_REPLICA_FOUND;
Expand All @@ -64,8 +73,8 @@ public class XceiverClientManager extends XceiverClientCreator {

private final Cache<String, XceiverClientSpi> clientCache;
private final CacheMetrics cacheMetrics;

private static XceiverClientMetrics metrics;
private final ConcurrentHashMap<String, DatanodeDetails> localDNCache;

/**
* Creates a new XceiverClientManager for non secured ozone cluster.
Expand Down Expand Up @@ -105,6 +114,7 @@ public void onRemoval(
}).build();

cacheMetrics = CacheMetrics.create(clientCache, this);
this.localDNCache = new ConcurrentHashMap<>();
}

@VisibleForTesting
Expand All @@ -117,17 +127,54 @@ public Cache<String, XceiverClientSpi> getClientCache() {
*
* If there is already a cached XceiverClientSpi, simply return
* the cached otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
@Override
public XceiverClientSpi acquireClient(Pipeline pipeline)
throws IOException {
return acquireClient(pipeline, false, false);
}

/**
* Acquires a XceiverClientSpi connected to a container for read.
*
* If there is already a cached XceiverClientSpi, simply return
* the cached otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
@Override
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline, boolean allowShortCircuit)
throws IOException {
return acquireClient(pipeline, false, allowShortCircuit);
}

/**
* Acquires a XceiverClientSpi connected to a container capable of
* storing the specified key.
*
* If there is already a cached XceiverClientSpi, simply return
* the cached otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
@Override
public XceiverClientSpi acquireClient(Pipeline pipeline,
boolean topologyAware) throws IOException {
boolean topologyAware, boolean allowShortCircuit) throws IOException {
Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getNodes() != null);
Preconditions.checkArgument(!pipeline.getNodes().isEmpty(),
NO_REPLICA_FOUND);

synchronized (clientCache) {
XceiverClientSpi info = getClient(pipeline, topologyAware);
XceiverClientSpi info = getClient(pipeline, topologyAware, allowShortCircuit);
info.incrementReference();
return info;
}
Expand All @@ -141,7 +188,7 @@ public void releaseClient(XceiverClientSpi client, boolean invalidateClient,
client.decrementReference();
if (invalidateClient) {
Pipeline pipeline = client.getPipeline();
String key = getPipelineCacheKey(pipeline, topologyAware);
String key = getPipelineCacheKey(pipeline, topologyAware, client instanceof XceiverClientShortCircuit);
XceiverClientSpi cachedClient = clientCache.getIfPresent(key);
if (cachedClient == client) {
clientCache.invalidate(key);
Expand All @@ -150,24 +197,50 @@ public void releaseClient(XceiverClientSpi client, boolean invalidateClient,
}
}

protected XceiverClientSpi getClient(Pipeline pipeline, boolean topologyAware)
protected XceiverClientSpi getClient(Pipeline pipeline, boolean topologyAware, boolean allowShortCircuit)
throws IOException {
try {
// create different client different pipeline node based on
// network topology
String key = getPipelineCacheKey(pipeline, topologyAware);
return clientCache.get(key, () -> newClient(pipeline));
String key = getPipelineCacheKey(pipeline, topologyAware, allowShortCircuit);
if (key.endsWith(DomainSocketFactory.FEATURE_FLAG)) {
final Pipeline newPipeline = Pipeline.newBuilder(pipeline).setReplicationConfig(
ReplicationConfig.fromTypeAndFactor(ReplicationType.SHORT_CIRCUIT,
ReplicationFactor.valueOf(pipeline.getReplicationConfig().getReplication()))).build();
return clientCache.get(key, () -> newClient(newPipeline, localDNCache.get(key)));
} else {
return clientCache.get(key, () -> newClient(pipeline));
}
} catch (Exception e) {
throw new IOException(
"Exception getting XceiverClient: " + e, e);
}
}

private String getPipelineCacheKey(Pipeline pipeline,
boolean topologyAware) {
String key = pipeline.getId().getId().toString() + pipeline.getType();
private String getPipelineCacheKey(Pipeline pipeline, boolean topologyAware, boolean allowShortCircuit) {
String key = pipeline.getId().getId().toString() + "-" + pipeline.getType();
boolean isEC = pipeline.getType() == HddsProtos.ReplicationType.EC;
if (topologyAware || isEC) {
DatanodeDetails localDN = null;

if ((!isEC) && allowShortCircuit && isShortCircuitEnabled()) {
int port = 0;
InetSocketAddress localAddr = null;
for (DatanodeDetails dn : pipeline.getNodes()) {
// read port from the data node, on failure use default configured port.
port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
InetSocketAddress addr = NetUtils.createSocketAddr(dn.getIpAddress(), port);
if (OzoneNetUtils.isAddressLocal(addr) &&
dn.getCurrentVersion() >= SHORT_CIRCUIT_READS.toProtoValue()) {
localAddr = addr;
localDN = dn;
break;
}
}
if (localAddr != null) {
// Find a local DN and short circuit read is enabled
key += "@" + localAddr.getHostName() + ":" + port + "/" + DomainSocketFactory.FEATURE_FLAG;
}
} else if (topologyAware || isEC) {
try {
DatanodeDetails closestNode = pipeline.getClosestNode();
// Pipeline cache key uses host:port suffix to handle
Expand All @@ -185,7 +258,7 @@ private String getPipelineCacheKey(Pipeline pipeline,
// Standalone port is chosen since all datanodes should have a
// standalone port regardless of version and this port should not
// have any collisions.
key += closestNode.getHostName() + closestNode.getPort(
key += closestNode.getHostName() + ":" + closestNode.getPort(
DatanodeDetails.Port.Name.STANDALONE);
} catch (IOException e) {
LOG.error("Failed to get closest node to create pipeline cache key:" +
Expand All @@ -197,12 +270,16 @@ private String getPipelineCacheKey(Pipeline pipeline,
// Append user short name to key to prevent a different user
// from using same instance of xceiverClient.
try {
key += UserGroupInformation.getCurrentUser().getShortUserName();
key = UserGroupInformation.getCurrentUser().getShortUserName() + "@" + key;
} catch (IOException e) {
LOG.error("Failed to get current user to create pipeline cache key:" +
e.getMessage());
}
}
LOG.info("cache key {} for pipeline {}", key, pipeline);
if (localDN != null) {
localDNCache.put(key, localDN);
}
return key;
}

Expand All @@ -211,12 +288,14 @@ private String getPipelineCacheKey(Pipeline pipeline,
*/
@Override
public void close() {
super.close();
//closing is done through RemovalListener
clientCache.invalidateAll();
clientCache.cleanUp();
if (LOG.isDebugEnabled()) {
LOG.debug("XceiverClient cache stats: {}", clientCache.stats());
}
localDNCache.clear();
cacheMetrics.unregister();

if (metrics != null) {
Expand Down Expand Up @@ -263,7 +342,7 @@ public static class ScmClientConfig {

@Config(key = "idle.threshold",
type = ConfigType.TIME, timeUnit = MILLISECONDS,
defaultValue = "10s",
defaultValue = "300s",
tags = {OZONE, PERFORMANCE},
description =
"In the standalone pipelines, the SCM clients use netty to "
Expand Down
Loading

0 comments on commit e04c83b

Please sign in to comment.