From fdef943fcc14e4b4a954cf7af707db70a4275f72 Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Wed, 6 Nov 2024 12:50:00 +0800 Subject: [PATCH] HDDS-11622. Support domain socket creation (#7397) --- .../hadoop/hdds/scm/OzoneClientConfig.java | 54 ++++ .../hadoop/hdds/scm/storage/DomainPeer.java | 111 +++++++ .../hdds/scm/storage/DomainSocketFactory.java | 272 ++++++++++++++++++ 3 files changed, 437 insertions(+) create mode 100644 hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainPeer.java create mode 100644 hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainSocketFactory.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 5426bbc49817..3497f3359fc1 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -39,6 +39,60 @@ public class OzoneClientConfig { private static final Logger LOG = LoggerFactory.getLogger(OzoneClientConfig.class); + public static final boolean OZONE_READ_SHORT_CIRCUIT_DEFAULT = false; + public static final String OZONE_DOMAIN_SOCKET_PATH = "ozone.domain.socket.path"; + public static final String OZONE_DOMAIN_SOCKET_PATH_DEFAULT = "/var/lib/ozone/dn_socket"; + public static final String SHORT_CIRCUIT_PREFIX = "read.short-circuit."; + public static final short DATA_TRANSFER_VERSION = 28; + public static final byte DATA_TRANSFER_MAGIC_CODE = 99; + + @Config(key = "read.short-circuit", + defaultValue = "false", + type = ConfigType.BOOLEAN, + description = "Whether read short-circuit is enabled or not", + tags = { ConfigTag.CLIENT, ConfigTag.DATANODE }) + private boolean shortCircuitEnabled = OZONE_READ_SHORT_CIRCUIT_DEFAULT; + + @Config(key = SHORT_CIRCUIT_PREFIX + "buffer.size", + defaultValue = "128KB", + type = ConfigType.SIZE, + description = "Buffer size of reader/writer.", + tags = { ConfigTag.CLIENT, ConfigTag.DATANODE }) + private int shortCircuitBufferSize = 128 * 1024; + + @Config(key = SHORT_CIRCUIT_PREFIX + "disable.interval", + defaultValue = "600", + type = ConfigType.LONG, + description = "If some unknown IO error happens on Domain socket read, short circuit read will be disabled " + + "temporarily for this period of time(seconds).", + tags = { ConfigTag.CLIENT }) + private long shortCircuitReadDisableInterval = 60 * 10; + + public boolean isShortCircuitEnabled() { + return shortCircuitEnabled; + } + + public void setShortCircuit(boolean enabled) { + shortCircuitEnabled = enabled; + } + + + public int getShortCircuitBufferSize() { + return shortCircuitBufferSize; + } + + public void setShortCircuitBufferSize(int size) { + this.shortCircuitBufferSize = size; + } + + public long getShortCircuitReadDisableInterval() { + return shortCircuitReadDisableInterval; + } + + public void setShortCircuitReadDisableInterval(long value) { + shortCircuitReadDisableInterval = value; + } + /** * Enum for indicating what mode to use when combining chunk and block * checksums to define an aggregate FileChecksum. This should be considered diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainPeer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainPeer.java new file mode 100644 index 000000000000..3fcebb7f0b73 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainPeer.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.storage; + +import org.apache.hadoop.net.unix.DomainSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.ReadableByteChannel; + +/** + * Represents a peer that we communicate with by using blocking I/O + * on a UNIX domain socket. + */ +public class DomainPeer implements Closeable { + private final DomainSocket socket; + private final OutputStream out; + private final InputStream in; + private final ReadableByteChannel channel; + public static final Logger LOG = LoggerFactory.getLogger(DomainPeer.class); + + public DomainPeer(DomainSocket socket) { + this.socket = socket; + this.out = socket.getOutputStream(); + this.in = socket.getInputStream(); + this.channel = socket.getChannel(); + } + + public ReadableByteChannel getInputStreamChannel() { + return channel; + } + + public void setReadTimeout(int timeoutMs) throws IOException { + socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs); + } + + public int getReceiveBufferSize() throws IOException { + return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); + } + + public void setWriteTimeout(int timeoutMs) throws IOException { + socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs); + } + + public boolean isClosed() { + return !socket.isOpen(); + } + + public void close() throws IOException { + socket.close(); + LOG.info("{} is closed", socket); + } + + public String getRemoteAddressString() { + return "unix:{" + socket.toString() + "}"; + } + + public String getLocalAddressString() { + return ""; + } + + public InputStream getInputStream() throws IOException { + return in; + } + + public OutputStream getOutputStream() throws IOException { + return out; + } + + @Override + public String toString() { + return "DomainPeer(" + getRemoteAddressString() + ")"; + } + + public DomainSocket getDomainSocket() { + return socket; + } + + public boolean hasSecureChannel() { + // + // Communication over domain sockets is assumed to be secure, since it + // doesn't pass over any network. We also carefully control the privileges + // that can be used on the domain socket inode and its parent directories. + // See #{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0} + // for details. + // + // So unless you are running as root or the user launches the service, you cannot + // launch a man-in-the-middle attach on UNIX domain socket traffic. + // + return true; + } +} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainSocketFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainSocketFactory.java new file mode 100644 index 000000000000..e62e2a6bfd2a --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainSocketFactory.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.storage; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.SystemUtils; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.net.unix.DomainSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A factory to help create DomainSocket. + */ +public final class DomainSocketFactory implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger( + DomainSocketFactory.class); + + /** + * Domain socket path state. + */ + public enum PathState { + NOT_CONFIGURED(false), + DISABLED(false), + VALID(true); + + PathState(boolean usableForShortCircuit) { + this.usableForShortCircuit = usableForShortCircuit; + } + + public boolean getUsableForShortCircuit() { + return usableForShortCircuit; + } + private final boolean usableForShortCircuit; + } + + /** + * Domain socket path. + */ + public static class PathInfo { + private static final PathInfo NOT_CONFIGURED = new PathInfo("", PathState.NOT_CONFIGURED); + private static final PathInfo DISABLED = new PathInfo("", PathState.DISABLED); + private static final PathInfo VALID = new PathInfo("", PathState.VALID); + + private final String path; + private final PathState state; + + PathInfo(String path, PathState state) { + this.path = path; + this.state = state; + } + + public String getPath() { + return path; + } + + public PathState getPathState() { + return state; + } + + @Override + public String toString() { + return "PathInfo{path=" + path + ", state=" + state + "}"; + } + } + + public static final String FEATURE = "short-circuit reads"; + public static final String FEATURE_FLAG = "SC"; + private static boolean nativeLibraryLoaded = false; + private static String nativeLibraryLoadFailureReason; + private long pathExpireMills; + private final ConcurrentHashMap pathMap; + private Timer timer; + private boolean isEnabled = false; + private String domainSocketPath; + + static { + // Try to load native hadoop library and set fallback flag appropriately + if (SystemUtils.IS_OS_WINDOWS) { + nativeLibraryLoadFailureReason = "UNIX Domain sockets are not available on Windows."; + } else { + LOG.info("Trying to load the custom-built native-hadoop library..."); + try { + System.loadLibrary("hadoop"); + LOG.info("Loaded the native-hadoop library"); + nativeLibraryLoaded = true; + } catch (Throwable t) { + // Ignore failure to continue + LOG.info("Failed to load native-hadoop with error: " + t); + LOG.info("java.library.path=" + System.getProperty("java.library.path")); + nativeLibraryLoadFailureReason = "libhadoop cannot be loaded."; + } + + if (!nativeLibraryLoaded) { + LOG.warn("Unable to load native-hadoop library for your platform... " + + "using builtin-java classes where applicable"); + } + } + } + + private static volatile DomainSocketFactory instance = null; + + public static DomainSocketFactory getInstance(ConfigurationSource conf) { + if (instance == null) { + synchronized (DomainSocketFactory.class) { + if (instance == null) { + instance = new DomainSocketFactory(conf); + } + } + } + return instance; + } + + private DomainSocketFactory(ConfigurationSource conf) { + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + boolean shortCircuitEnabled = clientConfig.isShortCircuitEnabled(); + PathInfo pathInfo; + long startTime = System.nanoTime(); + if (!shortCircuitEnabled) { + LOG.info(FEATURE + " is disabled."); + pathInfo = PathInfo.NOT_CONFIGURED; + } else { + domainSocketPath = conf.get(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH, + OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH_DEFAULT); + if (domainSocketPath.isEmpty()) { + throw new IllegalArgumentException(FEATURE + " is enabled but " + + OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH + " is not set."); + } else if (!nativeLibraryLoaded) { + LOG.warn(FEATURE + " cannot be used because " + nativeLibraryLoadFailureReason); + pathInfo = PathInfo.DISABLED; + } else { + pathInfo = PathInfo.VALID; + isEnabled = true; + timer = new Timer(DomainSocketFactory.class.getSimpleName() + "-Timer"); + LOG.info(FEATURE + " is enabled within {} ns.", System.nanoTime() - startTime); + } + } + pathExpireMills = clientConfig.getShortCircuitReadDisableInterval() * 1000; + pathMap = new ConcurrentHashMap<>(); + pathMap.put(domainSocketPath, pathInfo); + } + + public boolean isServiceEnabled() { + return isEnabled; + } + + public boolean isServiceReady() { + if (isEnabled) { + PathInfo status = pathMap.get(domainSocketPath); + return status.getPathState() == PathState.VALID; + } else { + return false; + } + } + + /** + * Get information about a domain socket path. Caller must make sure that addr is a local address. + * + * @param addr The local inet address to use. + * @return Information about the socket path. + */ + public PathInfo getPathInfo(InetSocketAddress addr) { + if (!isEnabled) { + return PathInfo.NOT_CONFIGURED; + } + + if (!isServiceReady()) { + return PathInfo.DISABLED; + } + + String escapedPath = DomainSocket.getEffectivePath(domainSocketPath, addr.getPort()); + PathInfo status = pathMap.get(escapedPath); + if (status == null) { + PathInfo pathInfo = new PathInfo(escapedPath, PathState.VALID); + pathMap.putIfAbsent(escapedPath, pathInfo); + return pathInfo; + } else { + return status; + } + } + + /** + * Create DomainSocket for addr. Caller must make sure that addr is a local address. + */ + public DomainSocket createSocket(int readTimeoutMs, int writeTimeoutMs, InetSocketAddress addr) throws IOException { + if (!isEnabled || !isServiceReady()) { + return null; + } + boolean success = false; + DomainSocket sock = null; + String escapedPath = null; + long startTime = System.nanoTime(); + try { + escapedPath = DomainSocket.getEffectivePath(domainSocketPath, addr.getPort()); + sock = DomainSocket.connect(escapedPath); + sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, readTimeoutMs); + sock.setAttribute(DomainSocket.SEND_TIMEOUT, writeTimeoutMs); + success = true; + LOG.info("{} is created within {} ns", sock, System.nanoTime() - startTime); + } catch (IOException e) { + LOG.error("Failed to create DomainSocket", e); + throw e; + } finally { + if (!success) { + if (sock != null) { + IOUtils.closeQuietly(sock); + } + if (escapedPath != null) { + pathMap.put(escapedPath, PathInfo.DISABLED); + LOG.error("{} is disabled for {} ms due to current failure", escapedPath, pathExpireMills); + schedulePathEnable(escapedPath, pathExpireMills); + } + sock = null; + } + } + return sock; + } + + public void disableShortCircuit() { + pathMap.put(domainSocketPath, PathInfo.DISABLED); + schedulePathEnable(domainSocketPath, pathExpireMills); + } + + private void schedulePathEnable(String path, long delayMills) { + timer.schedule(new TimerTask() { + @Override + public void run() { + pathMap.put(path, PathInfo.VALID); + } + }, delayMills); + } + + @VisibleForTesting + public void clearPathMap() { + pathMap.clear(); + } + + public long getPathExpireMills() { + return pathExpireMills; + } + + @Override + public void close() { + if (timer != null) { + timer.cancel(); + } + } +}