Skip to content

Commit

Permalink
fixed failed UT, and use customized hadoop image
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenSammi committed Dec 5, 2024
1 parent a6bea06 commit ea2d594
Show file tree
Hide file tree
Showing 19 changed files with 72 additions and 84 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ env:
# Minimum required Java version for running Ozone is defined in pom.xml (javac.version).
TEST_JAVA_VERSION: 17 # JDK version used by CI build and tests; should match the JDK version in apache/ozone-runner image
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
HADOOP_IMAGE: ghcr.io/apache/hadoop
HADOOP_IMAGE: ghcr.io/chensammi/hadoop
OZONE_IMAGE: ghcr.io/apache/ozone
OZONE_RUNNER_IMAGE: ghcr.io/apache/ozone-runner
OZONE_WITH_COVERAGE: ${{ github.event_name == 'push' }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ private void acquireClient() throws IOException {
// fall back to acquire GRPC client
if (xceiverClientFactory != null && xceiverClientGrpc == null) {
try {
xceiverClientGrpc = xceiverClientFactory.acquireClientForReadData(pipeline, false);
xceiverClientGrpc = xceiverClientFactory.acquireClientForReadData(pipeline);
} catch (IOException ioe) {
LOG.warn("Failed to acquire client for pipeline {}, block {}", pipeline, blockID);
throw ioe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Timer;
Expand Down Expand Up @@ -153,12 +152,6 @@ private DomainSocketFactory(ConfigurationSource conf) {
LOG.warn(FEATURE + " cannot be used because " + nativeLibraryLoadFailureReason);
pathInfo = PathInfo.DISABLED;
} else {
File file = new File(domainSocketPath);
if (file.exists()) {
throw new IllegalArgumentException(FEATURE + " is enabled but "
+ OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH + " is an existing " +
(file.isDirectory() ? "directory" : "file"));
}
pathInfo = PathInfo.VALID;
isEnabled = true;
timer = new Timer(DomainSocketFactory.class.getSimpleName() + "-Timer");
Expand Down Expand Up @@ -274,9 +267,11 @@ public Timer getTimer() {
}

public static synchronized void close() {
if (instance.getTimer() != null) {
instance.getTimer().cancel();
if (instance != null) {
if (instance.getTimer() != null) {
instance.getTimer().cancel();
}
DomainSocketFactory.instance = null;
}
DomainSocketFactory.instance = null;
}
}
11 changes: 10 additions & 1 deletion hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2003,7 +2003,16 @@
<value>30s</value>
<tag>OZONE, CLIENT, MANAGEMENT</tag>
<description>
Timeout for ozone grpc client during read.
Timeout for ozone grpc and short-circuit client during read.
</description>
</property>

<property>
<name>ozone.client.write.timeout</name>
<value>30s</value>
<tag>OZONE, CLIENT, MANAGEMENT</tag>
<description>
Timeout for ozone short-circuit client during write.
</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private boolean isSupportedCmdType(ContainerProtos.Type type) {
return type == ContainerProtos.Type.GetBlock || type == ContainerProtos.Type.Echo;
}

class TaskEntry {
static class TaskEntry {
private ContainerCommandRequestProto request;
private ContainerCommandResponseProto response;
private FileInputStream fis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void run() {
LOG.info("XceiverServerDomainSocket is closed", ace);
} catch (IOException ie) {
// usually when the xceiver count limit is hit.
LOG.warn("Got an exception. Peer {}", peer.toString(), ie);
LOG.warn("Got an exception. Peer {}", peer, ie);
IOUtils.closeQuietly(peer);
} catch (OutOfMemoryError ie) {
IOUtils.closeQuietly(peer);
Expand Down
4 changes: 2 additions & 2 deletions hadoop-ozone/dev-support/checks/_lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ download_hadoop_aws() {
if [[ ! -e "${dir}" ]] || [[ ! -d "${dir}"/src/test/resources ]]; then
mkdir -p "${dir}"
if [[ ! -f "${dir}.tar.gz" ]]; then
local url="https://archive.apache.org/dist/hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}-src.tar.gz"
local url="https://github.com/ChenSammi/hadoop/archive/refs/tags/release-${HADOOP_VERSION}-RC0.tar.gz"
echo "Downloading Hadoop from ${url}"
curl -LSs --fail -o "${dir}.tar.gz" "$url" || return 1
fi
tar -x -z -C "${dir}" --strip-components=3 -f "${dir}.tar.gz" --wildcards 'hadoop-*-src/hadoop-tools/hadoop-aws' || return 1
tar -x -z -C "${dir}" --strip-components=3 -f "${dir}.tar.gz" --wildcards 'hadoop-*/hadoop-tools/hadoop-aws' || return 1
fi
}
1 change: 0 additions & 1 deletion hadoop-ozone/dist/src/main/compose/ozone-ha/.env
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.

HDDS_VERSION=${hdds.version}
HADOOP_IMAGE=apache/hadoop
OZONE_RUNNER_VERSION=${docker.ozone-runner.version}
OZONE_RUNNER_IMAGE=apache/ozone-runner
OZONE_OPTS=
1 change: 0 additions & 1 deletion hadoop-ozone/dist/src/main/compose/ozone/.env
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.

HDDS_VERSION=${hdds.version}
HADOOP_IMAGE=apache/hadoop
OZONE_RUNNER_VERSION=${docker.ozone-runner.version}
OZONE_RUNNER_IMAGE=apache/ozone-runner
OZONE_OPTS=
2 changes: 1 addition & 1 deletion hadoop-ozone/dist/src/main/compose/ozonesecure-ha/.env
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.

HDDS_VERSION=${hdds.version}
HADOOP_IMAGE=apache/hadoop
HADOOP_IMAGE=${hadoop.image}
HADOOP_VERSION=${hadoop.version}
OZONE_RUNNER_VERSION=${docker.ozone-runner.version}
OZONE_RUNNER_IMAGE=apache/ozone-runner
Expand Down
2 changes: 1 addition & 1 deletion hadoop-ozone/dist/src/main/compose/ozonesecure-mr/.env
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.

HDDS_VERSION=${hdds.version}
HADOOP_IMAGE=apache/hadoop
HADOOP_IMAGE=${hadoop.image}
HADOOP_VERSION=${hadoop.version}
OZONE_RUNNER_VERSION=${docker.ozone-runner.version}
OZONE_RUNNER_IMAGE=apache/ozone-runner
Expand Down
2 changes: 1 addition & 1 deletion hadoop-ozone/dist/src/main/compose/ozonesecure/.env
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.

HDDS_VERSION=${hdds.version}
HADOOP_IMAGE=apache/hadoop
HADOOP_IMAGE=${hadoop.image}
HADOOP_VERSION=${hadoop.version}
OZONE_RUNNER_VERSION=${docker.ozone-runner.version}
OZONE_RUNNER_IMAGE=apache/ozone-runner
Expand Down
2 changes: 1 addition & 1 deletion hadoop-ozone/dist/src/main/compose/upgrade/compose/ha/.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

HADOOP_IMAGE=apache/hadoop
HADOOP_IMAGE=${hadoop.image}
HADOOP_VERSION=${hadoop.version}
HDDS_VERSION=${hdds.version}
OZONE_RUNNER_VERSION=${docker.ozone-runner.version}
Expand Down
5 changes: 3 additions & 2 deletions hadoop-ozone/dist/src/main/smoketest/s3/bucketcreate.robot
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ Create bucket with invalid bucket name
${result} = Execute AWSS3APICli and checkrc create-bucket --bucket invalid_bucket_${randStr} 255
Should contain ${result} InvalidBucketName

Create new bucket and check no group ACL
Create new bucket and check default group ACL
${bucket} = Create bucket
${acl} = Execute ozone sh bucket getacl s3v/${bucket}
${group} = Get Regexp Matches ${acl} "GROUP"
IF '${group}' is not '[]'
${json} = Evaluate json.loads('''${acl}''') json
# make sure this check is for group acl
Should contain ${json}[1][type] GROUP
Should contain ${json}[1][aclList] NONE
Should contain ${json}[1][aclList] READ
Should contain ${json}[1][aclList] LIST
END
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand All @@ -38,6 +39,7 @@
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerDomainSocket;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
Expand Down Expand Up @@ -158,21 +160,22 @@ public void testIllegalDomainPathConfiguration() {
} finally {
factory.close();
}
}

// an existing domain path, the existing regular file will be overwritten and turned into a socket file,
// so configure an existing domain path is disallowed.
@Test
public void testExistingDomainPath() {
// an existing domain path, the existing file is override and changed from a normal file to a socket file
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH, new File(dir, "ozone-socket").getAbsolutePath());
File file = new File(dir, "ozone-socket");
DomainSocketFactory factory = DomainSocketFactory.getInstance(conf);
try {
File file = new File(dir, "ozone-socket");
assertTrue(file.createNewFile());
DomainSocketFactory.getInstance(conf);
fail("an existing domain path is not allowed.");
new XceiverServerDomainSocket(MockDatanodeDetails.randomDatanodeDetails(),
conf, null, readExecutors, metrics, factory);
} catch (Throwable e) {
e.printStackTrace();
assertTrue(e instanceof IllegalArgumentException);
assertTrue(e.getMessage().contains("an existing file"));
fail("an existing domain path is supported by not recommended.");
} finally {
file.delete();
factory.close();
}
}

Expand Down Expand Up @@ -347,12 +350,7 @@ public void testReadWrite(boolean deleteFileBeforeRead, boolean deleteFileDuring
assertEquals(1, containerMetrics.getContainerLocalOpsMetrics(ContainerProtos.Type.GetBlock));
} finally {
factory.close();
if (sock != null) {
try {
sock.close();
} catch (IOException e) {
}
}
IOUtils.closeQuietly(sock);
server.stop();
}
}
Expand All @@ -372,12 +370,7 @@ public void testServerNotListening() {
assertTrue(e.getMessage().contains("connect(2) error: No such file or directory"));
} finally {
factory.close();
if (sock != null) {
try {
sock.close();
} catch (IOException e) {
}
}
IOUtils.closeQuietly(sock);
}
}

Expand All @@ -392,29 +385,28 @@ public void testServerNotStart() {
XceiverServerDomainSocket server = new XceiverServerDomainSocket(MockDatanodeDetails.randomDatanodeDetails(),
conf, null, readExecutors, metrics, factory);
DomainSocket sock = null;
DataOutputStream outputStream = null;
DataInputStream inputStream = null;
try {
sock = factory.createSocket(readTimeout, writeTimeout, localhost);
assertTrue(sock.isOpen());
// send request
final DataOutputStream outputStream = new DataOutputStream(sock.getOutputStream());
outputStream = new DataOutputStream(sock.getOutputStream());
outputStream.writeShort(OzoneClientConfig.DATA_TRANSFER_VERSION);
outputStream.writeShort(GetBlock.getNumber());
getBlockRequest().writeDelimitedTo(outputStream);
outputStream.flush();

final DataInputStream inputStream = new DataInputStream(sock.getInputStream());
inputStream = new DataInputStream(sock.getInputStream());
inputStream.readShort();
} catch (IOException e) {
assertTrue(e instanceof SocketTimeoutException);
assertTrue(e.getMessage().contains("read(2) error: Resource temporarily unavailable"));
} finally {
factory.close();
if (sock != null) {
try {
sock.close();
} catch (IOException e) {
}
}
IOUtils.closeQuietly(outputStream);
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(sock);
server.stop();
}
}
Expand Down Expand Up @@ -445,19 +437,17 @@ public void testReadTimeout() throws InterruptedException {
assertTrue(e.getMessage().contains("write(2) error: Broken pipe"));
} finally {
factory.close();
if (sock != null) {
try {
sock.close();
} catch (IOException e) {
}
}
IOUtils.closeQuietly(sock);
server.stop();
}
}

@Test
public void testMaxXceiverCount() throws IOException, InterruptedException {
conf.set(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH, new File(dir, "ozone-socket").getAbsolutePath());
DatanodeConfiguration datanodeConfiguration = conf.getObject(DatanodeConfiguration.class);
datanodeConfiguration.setNumReadThreadPerVolume(2);
conf.setFromObject(datanodeConfiguration);
DomainSocketFactory factory = DomainSocketFactory.getInstance(conf);
XceiverServerDomainSocket server = new XceiverServerDomainSocket(MockDatanodeDetails.randomDatanodeDetails(),
conf, null, readExecutors, metrics, factory);
Expand All @@ -466,20 +456,21 @@ public void testMaxXceiverCount() throws IOException, InterruptedException {
GenericTestUtils.LogCapturer.captureLogs(XceiverServerDomainSocket.LOG);
try {
server.start();
// test max allowed xceiver count(default 10 * 5)
int count = 51;
// test max allowed xceiver count(2 * 5)
int count = 11;
for (int i = 1; i <= count; i++) {
DomainSocket sock = factory.createSocket(readTimeout, writeTimeout, localhost);
list.add(sock);
}

logCapturer.getOutput().contains("Xceiver count exceeds the limit" + (count - 1));
Thread.sleep(2000);
assertTrue(logCapturer.getOutput().contains("Xceiver count exceeds the limit " + (count - 1)));
DomainSocket lastSock = list.get(list.size() - 1);
// although remote peer is already closed due to limit exhausted, sock.isOpen() is still true.
// Only when client read/write socket stream, there will be exception or -1 returned.
assertTrue(lastSock.isOpen());

// write to first 50 sockets should be OK
// write to first 10 sockets should be OK
for (int i = 0; i < count - 2; i++) {
DomainSocket sock = list.get(i);
assertTrue(sock.isOpen());
Expand All @@ -489,7 +480,6 @@ public void testMaxXceiverCount() throws IOException, InterruptedException {
assertFalse(sock.isOpen());
}

Thread.sleep(5000);
// read a broken pipe will return -1
int data = lastSock.getInputStream().read();
assertEquals(-1, data);
Expand Down Expand Up @@ -522,24 +512,21 @@ public void testSendIrrelevantMessage() {
XceiverServerDomainSocket server = new XceiverServerDomainSocket(MockDatanodeDetails.randomDatanodeDetails(),
conf, null, readExecutors, metrics, factory);
DomainSocket sock = null;
DataOutputStream outputStream = null;
String data = "hello world";
try {
server.start();
sock = factory.createSocket(readTimeout, writeTimeout, localhost);
final DataOutputStream outputStream = new DataOutputStream(sock.getOutputStream());
outputStream.write(data.getBytes());
outputStream = new DataOutputStream(sock.getOutputStream());
outputStream.write(data.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
sock.getInputStream().read();
} catch (IOException e) {
assertTrue(e instanceof EOFException);
} finally {
factory.close();
if (sock != null) {
try {
sock.close();
} catch (IOException e) {
}
}
IOUtils.closeQuietly(outputStream);
IOUtils.closeQuietly(sock);
server.stop();
}
}
Expand Down Expand Up @@ -573,12 +560,7 @@ public void testSendUnsupportedRequest() throws IOException {
assertTrue(responseProto.getResult() == ContainerProtos.Result.UNSUPPORTED_REQUEST);
} finally {
factory.close();
if (sock != null) {
try {
sock.close();
} catch (IOException e) {
}
}
IOUtils.closeQuietly(sock);
server.stop();
}
}
Expand Down
Loading

0 comments on commit ea2d594

Please sign in to comment.