Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenSammi committed Dec 2, 2024
1 parent c8b110d commit ce8b05e
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ private String getPipelineCacheKey(Pipeline pipeline, boolean topologyAware, boo
// Find a local DN and short circuit read is enabled
key += "@" + localAddr.getHostName() + ":" + port + "/" + DomainSocketFactory.FEATURE_FLAG;
}
} else if (topologyAware || isEC) {
}

if (localDN == null && (topologyAware || isEC)) {
try {
DatanodeDetails closestNode = pipeline.getClosestNode();
// Pipeline cache key uses host:port suffix to handle
Expand Down Expand Up @@ -275,7 +277,7 @@ private String getPipelineCacheKey(Pipeline pipeline, boolean topologyAware, boo
e.getMessage());
}
}
LOG.info("cache key {} for pipeline {}", key, pipeline);

if (localDN != null) {
localDNCache.put(key, localDN);
}
Expand Down Expand Up @@ -341,7 +343,7 @@ public static class ScmClientConfig {

@Config(key = "idle.threshold",
type = ConfigType.TIME, timeUnit = MILLISECONDS,
defaultValue = "300s",
defaultValue = "10s",
tags = {OZONE, PERFORMANCE},
description =
"In the standalone pipelines, the SCM clients use netty to "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Timer;
Expand Down Expand Up @@ -152,6 +153,12 @@ private DomainSocketFactory(ConfigurationSource conf) {
LOG.warn(FEATURE + " cannot be used because " + nativeLibraryLoadFailureReason);
pathInfo = PathInfo.DISABLED;
} else {
File file = new File(domainSocketPath);
if (file.exists()) {
throw new IllegalArgumentException(FEATURE + " is enabled but "
+ OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH + " is an existing " +
(file.isDirectory() ? "directory" : "file"));
}
pathInfo = PathInfo.VALID;
isEnabled = true;
timer = new Timer(DomainSocketFactory.class.getSimpleName() + "-Timer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public enum DatanodeVersion implements ComponentVersion {
COMBINED_PUTBLOCK_WRITECHUNK_RPC(2, "WriteChunk can optionally support " +
"a PutBlock request"),

SHORT_CIRCUIT_READS(3, "Support short-circuit reads."),
SHORT_CIRCUIT_READS(3, "Version with short-circuit read support."),

FUTURE_VERSION(-1, "Used internally in the client when the server side is "
+ " newer and an unknown server version has arrived to the client.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void testContainerCommandRequestProtoConversion() throws InvalidProtocolB
assertEquals(containerID, request.getContainerID());
assertEquals(datanodeID, request.getDatanodeUuid());
assertEquals(localBlockID, request.getGetBlock().getBlockID().getLocalID());
assertEquals(containerID, request.getGetBlock().getBlockID().getContainerID());
assertEquals(bcsid, request.getGetBlock().getBlockID().getBlockCommitSequenceId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testCaching(boolean securityEnabled, @TempDir Path metaDir) throws I
HddsProtos.ReplicationFactor.ONE,
OzoneConsts.OZONE);
XceiverClientSpi client1 = clientManager
.acquireClientForReadData(container1.getPipeline(), true);
.acquireClient(container1.getPipeline());
assertEquals(1, client1.getRefcount());
// although allowShortCircuit true when calling acquireClientForReadData,
// XceiverClientGrpc client will be allocated since short-circuit is by default disabled.
Expand All @@ -109,13 +109,12 @@ public void testCaching(boolean securityEnabled, @TempDir Path metaDir) throws I
HddsProtos.ReplicationFactor.THREE,
OzoneConsts.OZONE);
XceiverClientSpi client2 = clientManager
.acquireClientForReadData(container2.getPipeline(), true);
.acquireClient(container2.getPipeline());
assertEquals(1, client2.getRefcount());
assertThat(client2 instanceof XceiverClientGrpc);

XceiverClientSpi client3 = clientManager
.acquireClientForReadData(container1.getPipeline(), true);
assertThat(client3 instanceof XceiverClientGrpc);
.acquireClient(container1.getPipeline());
assertEquals(2, client3.getRefcount());
assertEquals(2, client1.getRefcount());
assertEquals(client1, client3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public void testAllocateShortCircuitClient() throws IOException {
clientManager.releaseClient(client2, true);
// client is still kept in the cache, for create a domain socket connection is expensive.
assertEquals(1, clientManager.getClientCache().size());

XceiverClientSpi client3 = clientManager.acquireClientForReadData(container1.getPipeline(), false);
assertTrue(client3 instanceof XceiverClientGrpc);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.io.DataInputStream;
import java.io.DataOutputStream;
Expand Down Expand Up @@ -87,6 +89,7 @@
/**
* Tests the XceiverServerDomainSocket class.
*/
@Timeout(300)
public class TestXceiverServerDomainSocket {
private final InetSocketAddress localhost = InetSocketAddress.createUnresolved("localhost", 10000);
@TempDir
Expand Down Expand Up @@ -115,8 +118,7 @@ public static void setup() {
}

@Test
@Timeout(30)
public void testDomainPathConfiguration() {
public void testIllegalDomainPathConfiguration() {
// empty domain path
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH, "");
try {
Expand Down Expand Up @@ -157,25 +159,26 @@ public void testDomainPathConfiguration() {
factory.close();
}

// an existing domain path, the existing file is override and changed from a normal file to a socket file
// an existing domain path, the existing regular file will be overwritten and turned into a socket file,
// so configure an existing domain path is disallowed.
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH, new File(dir, "ozone-socket").getAbsolutePath());
factory = DomainSocketFactory.getInstance(conf);
File file = new File(dir, "ozone-socket");
try {
File file = new File(dir, "ozone-socket");
assertTrue(file.createNewFile());
new XceiverServerDomainSocket(MockDatanodeDetails.randomDatanodeDetails(),
conf, null, readExecutors, metrics, factory);
DomainSocketFactory.getInstance(conf);
fail("an existing domain path is not allowed.");
} catch (Throwable e) {
fail("an existing domain path is supported by not recommended.");
e.printStackTrace();
assertTrue(e instanceof IllegalArgumentException);
assertTrue(e.getMessage().contains("an existing file"));
} finally {
factory.close();
file.delete();
}
}

@Test
@Timeout(30)
public void testDomainPathPermission() {
// write from everyone is not allowed
// write from everyone is not allowed (permission too open)
assertTrue(dir.setWritable(true, false));
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH,
new File(dir, "ozone-socket").getAbsolutePath());
Expand Down Expand Up @@ -210,7 +213,7 @@ public void testDomainPathPermission() {
// write from owner is required
assertTrue(dir.setWritable(true, true));
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH,
new File(dir, "ozone-socket").getAbsolutePath());
new File(dir, "ozone-socket-write").getAbsolutePath());
factory = DomainSocketFactory.getInstance(conf);
XceiverServerDomainSocket server = null;
try {
Expand All @@ -230,7 +233,7 @@ public void testDomainPathPermission() {
assertTrue(dir.setWritable(true, true));
assertTrue(dir.setReadable(true, true));
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH,
new File(dir, "ozone-socket").getAbsolutePath());
new File(dir, "ozone-socket-execute").getAbsolutePath());
factory = DomainSocketFactory.getInstance(conf);
try {
new XceiverServerDomainSocket(MockDatanodeDetails.randomDatanodeDetails(),
Expand All @@ -244,12 +247,12 @@ public void testDomainPathPermission() {
dir.setExecutable(true, true);
}

// read from owner is not required?
// read from owner is not required
assertTrue(dir.setExecutable(true, true));
assertTrue(dir.setWritable(true, true));
assertTrue(dir.setReadable(false, true));
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH,
new File(dir, "ozone-socket").getAbsolutePath());
new File(dir, "ozone-socket-read").getAbsolutePath());
factory = DomainSocketFactory.getInstance(conf);
try {
server = new XceiverServerDomainSocket(MockDatanodeDetails.randomDatanodeDetails(),
Expand All @@ -266,30 +269,18 @@ public void testDomainPathPermission() {
}

/**
* Test a successful connection and then read/write.
*/
@Test
public void testReadWrite1() throws IOException {
testReadWrite(false, false);
}

/**
* On Linux, when there is still open file handle of a deleted file, the file handle remains open and can still
* Test connection and read/write.
* On Linux, when there is still open file handle of a deleted file, the file handle remains open and can still
* be used to read and write the file.
*/
@Test
@Timeout(30)
public void testReadWrite2() throws IOException {
testReadWrite(true, false);
}

@Test
@Timeout(30)
public void testReadWrite3() throws IOException {
testReadWrite(false, true);
}

private void testReadWrite(boolean deleteFileBeforeRead, boolean deleteFileDuringRead) throws IOException {
@ParameterizedTest
@CsvSource({
"true, true",
"true, false",
"false, true",
"false, false",
})
public void testReadWrite(boolean deleteFileBeforeRead, boolean deleteFileDuringRead) throws IOException {
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH, new File(dir, "ozone-socket").getAbsolutePath());
ContainerMetrics containerMetrics = ContainerMetrics.create(conf);
DomainSocketFactory factory = DomainSocketFactory.getInstance(conf);
Expand Down Expand Up @@ -370,7 +361,6 @@ private void testReadWrite(boolean deleteFileBeforeRead, boolean deleteFileDurin
* Test server is not listening.
*/
@Test
@Timeout(30)
public void testServerNotListening() {
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH, new File(dir, "ozone-socket").getAbsolutePath());
DomainSocketFactory factory = DomainSocketFactory.getInstance(conf);
Expand All @@ -396,7 +386,6 @@ public void testServerNotListening() {
* Although socket can be created, read will fail, write can succeed.
*/
@Test
@Timeout(30)
public void testServerNotStart() {
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH, new File(dir, "ozone-socket").getAbsolutePath());
DomainSocketFactory factory = DomainSocketFactory.getInstance(conf);
Expand Down Expand Up @@ -431,10 +420,9 @@ public void testServerNotStart() {
}

@Test
@Timeout(30)
public void testReadTimeout() throws InterruptedException {
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH, new File(dir, "ozone-socket").getAbsolutePath());
conf.set(OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT, "5s");
conf.set(OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT, "2s");
DomainSocketFactory factory = DomainSocketFactory.getInstance(conf);
XceiverServerDomainSocket server = new XceiverServerDomainSocket(MockDatanodeDetails.randomDatanodeDetails(),
conf, null, readExecutors, metrics, factory);
Expand All @@ -445,7 +433,7 @@ public void testReadTimeout() throws InterruptedException {
assertTrue(sock.isOpen());

// server will close the DomainSocket if there is no message from client in OZONE_CLIENT_READ_TIMEOUT
Thread.sleep(5 * 1000);
Thread.sleep(2 * 1000);
// send request
final DataOutputStream outputStream = new DataOutputStream(sock.getOutputStream());
outputStream.writeShort(OzoneClientConfig.DATA_TRANSFER_VERSION);
Expand All @@ -468,7 +456,6 @@ public void testReadTimeout() throws InterruptedException {
}

@Test
@Timeout(30)
public void testMaxXceiverCount() throws IOException, InterruptedException {
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH, new File(dir, "ozone-socket").getAbsolutePath());
DomainSocketFactory factory = DomainSocketFactory.getInstance(conf);
Expand Down Expand Up @@ -529,7 +516,6 @@ public void testMaxXceiverCount() throws IOException, InterruptedException {
* will treat it as a critical error, close the connection.
*/
@Test
@Timeout(30)
public void testSendIrrelevantMessage() {
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH, new File(dir, "ozone-socket").getAbsolutePath());
DomainSocketFactory factory = DomainSocketFactory.getInstance(conf);
Expand Down Expand Up @@ -559,7 +545,6 @@ public void testSendIrrelevantMessage() {
}

@Test
@Timeout(30)
public void testSendUnsupportedRequest() throws IOException {
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH, new File(dir, "ozone-socket").getAbsolutePath());
DomainSocketFactory factory = DomainSocketFactory.getInstance(conf);
Expand Down

0 comments on commit ce8b05e

Please sign in to comment.