From 6fd4fea748e0516b40b0a79456e3caaf1f6ab547 Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Tue, 1 Oct 2024 01:06:07 +0800 Subject: [PATCH] HADOOP-19261. Support force close a DomainSocket for server service (#7057) --- .../apache/hadoop/net/unix/DomainSocket.java | 71 ++++++++++++------- .../net/unix/TemporarySocketDirectory.java | 4 +- .../hadoop/net/unix/TestDomainSocket.java | 61 +++++++++------- 3 files changed, 84 insertions(+), 52 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java index 73fff0313a58c..3edd349efba91 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java @@ -339,10 +339,13 @@ private static native void closeFileDescriptor0(FileDescriptor fd) private static native void shutdown0(int fd) throws IOException; /** - * Close the Socket. + * Close the Server Socket without check refCount. + * When Server Socket is blocked on accept(), its refCount is 1. + * close() call on Server Socket will be stuck in the while loop count check. + * @param force if true, will not check refCount before close socket. + * @throws IOException raised on errors performing I/O. */ - @Override - public void close() throws IOException { + public void close(boolean force) throws IOException { // Set the closed bit on this DomainSocket int count; try { @@ -351,41 +354,61 @@ public void close() throws IOException { // Someone else already closed the DomainSocket. return; } - // Wait for all references to go away - boolean didShutdown = false; + boolean interrupted = false; - while (count > 0) { - if (!didShutdown) { + if (force) { + try { + // Calling shutdown on the socket will interrupt blocking system + // calls like accept, write, and read that are going on in a + // different thread. + shutdown0(fd); + } catch (IOException e) { + LOG.error("shutdown error: ", e); + } + } else { + // Wait for all references to go away + boolean didShutdown = false; + while (count > 0) { + if (!didShutdown) { + try { + // Calling shutdown on the socket will interrupt blocking system + // calls like accept, write, and read that are going on in a + // different thread. + shutdown0(fd); + } catch (IOException e) { + LOG.error("shutdown error: ", e); + } + didShutdown = true; + } try { - // Calling shutdown on the socket will interrupt blocking system - // calls like accept, write, and read that are going on in a - // different thread. - shutdown0(fd); - } catch (IOException e) { - LOG.error("shutdown error: ", e); + Thread.sleep(10); + } catch (InterruptedException e) { + interrupted = true; } - didShutdown = true; + count = refCount.getReferenceCount(); } - try { - Thread.sleep(10); - } catch (InterruptedException e) { - interrupted = true; - } - count = refCount.getReferenceCount(); } - // At this point, nobody has a reference to the file descriptor, + // At this point, nobody has a reference to the file descriptor, // and nobody will be able to get one in the future either. // We now call close(2) on the file descriptor. - // After this point, the file descriptor number will be reused by - // something else. Although this DomainSocket object continues to hold - // the old file descriptor number (it's a final field), we never use it + // After this point, the file descriptor number will be reused by + // something else. Although this DomainSocket object continues to hold + // the old file descriptor number (it's a final field), we never use it // again because this DomainSocket is closed. close0(fd); if (interrupted) { Thread.currentThread().interrupt(); } } + + /** + * Close the Socket. + */ + @Override + public void close() throws IOException { + close(false); + } /** * Call shutdown(SHUT_RDWR) on the UNIX domain socket. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java index c00b4b259aace..40399f07a29e7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java @@ -20,7 +20,6 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.util.Random; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileUtil; @@ -35,8 +34,7 @@ public class TemporarySocketDirectory implements Closeable { public TemporarySocketDirectory() { String tmp = System.getProperty("java.io.tmpdir", "/tmp"); - dir = new File(tmp, "socks." + (System.currentTimeMillis() + - "." + (new Random().nextInt()))); + dir = new File(tmp, "socks." + System.nanoTime()); dir.mkdirs(); FileUtil.setWritable(dir, true); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java index 61cbd85f8d69f..952f2b35e4314 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java @@ -130,7 +130,7 @@ public Void call(){ DomainSocket conn = DomainSocket.connect(serv.getPath()); Thread.sleep(50); conn.close(); - serv.close(); + serv.close(true); future.get(2, TimeUnit.MINUTES); } @@ -161,7 +161,7 @@ public Void call(){ }; Future future = exeServ.submit(callable); Thread.sleep(500); - serv.close(); + serv.close(true); future.get(2, TimeUnit.MINUTES); } @@ -240,7 +240,7 @@ public Void call(){ Future clientFuture = exeServ.submit(clientCallable); Thread.sleep(500); clientConn.close(); - serv.close(); + serv.close(true); clientFuture.get(2, TimeUnit.MINUTES); serverFuture.get(2, TimeUnit.MINUTES); } @@ -281,28 +281,39 @@ public void testServerOptions() throws Exception { final String TEST_PATH = new File(sockDir.getDir(), "test_sock_server_options").getAbsolutePath(); DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH); - try { - // Let's set a new receive buffer size - int bufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); - int newBufSize = bufSize / 2; - serv.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, newBufSize); - int nextBufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); - Assert.assertEquals(newBufSize, nextBufSize); - // Let's set a server timeout - int newTimeout = 1000; - serv.setAttribute(DomainSocket.RECEIVE_TIMEOUT, newTimeout); - int nextTimeout = serv.getAttribute(DomainSocket.RECEIVE_TIMEOUT); - Assert.assertEquals(newTimeout, nextTimeout); - try { - serv.accept(); - Assert.fail("expected the accept() to time out and fail"); - } catch (SocketTimeoutException e) { - GenericTestUtils.assertExceptionContains("accept(2) error: ", e); + // Let's set a new receive buffer size + int bufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); + int newBufSize = bufSize / 2; + serv.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, newBufSize); + int nextBufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); + Assert.assertEquals(newBufSize, nextBufSize); + // Let's set a server timeout + int newTimeout = 1000; + serv.setAttribute(DomainSocket.RECEIVE_TIMEOUT, newTimeout); + int nextTimeout = serv.getAttribute(DomainSocket.RECEIVE_TIMEOUT); + Assert.assertEquals(newTimeout, nextTimeout); + + ExecutorService exeServ = Executors.newSingleThreadExecutor(); + Callable callable = new Callable() { + public Void call() { + try { + serv.accept(); + Assert.fail("expected the accept() to time out and fail"); + } catch (SocketTimeoutException e) { + GenericTestUtils.assertExceptionContains("accept(2) error: ", e); + } catch (AsynchronousCloseException e) { + return null; + } catch (IOException e) { + throw new RuntimeException("unexpected IOException", e); + } + return null; } - } finally { - serv.close(); - Assert.assertFalse(serv.isOpen()); - } + }; + Future future = exeServ.submit(callable); + Thread.sleep(500); + serv.close(true); + future.get(); + Assert.assertFalse(serv.isOpen()); } /** @@ -656,7 +667,7 @@ public void run(){ } serverThread.join(120000); clientThread.join(120000); - serv.close(); + serv.close(true); for (PassedFile pf : passedFiles) { pf.cleanup(); }