diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 5e6a31b323f55..d1fdda65f127a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -18,8 +18,15 @@ */ package org.apache.pulsar.common.configuration; +import com.google.common.annotations.VisibleForTesting; import java.io.File; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.time.Clock; +import java.util.Arrays; import java.util.function.Supplier; +import java.util.stream.Collectors; import javax.servlet.ServletContext; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -27,6 +34,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.Response.Status; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.util.ThreadDumpUtil; /** * Web resource used by the VIP service to check to availability of the service instance. @@ -38,25 +46,84 @@ public class VipStatus { public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath"; public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe"; + // log a full thread dump when a deadlock is detected in status check once every 10 minutes + // to prevent excessive logging + private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L; + // Rate limit status checks to every 500ms to prevent DoS + private static final long CHECK_STATUS_INTERVAL = 500L; + + private static volatile long lastCheckStatusTimestamp; + private static volatile long lastPrintThreadDumpTimestamp; + private static volatile boolean lastCheckStatusResult; + + private long printThreadDumpIntervalMs; + private Clock clock; + @Context protected ServletContext servletContext; + public VipStatus() { + this.clock = Clock.systemUTC(); + this.printThreadDumpIntervalMs = LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED; + } + + @VisibleForTesting + public VipStatus(ServletContext servletContext, long printThreadDumpIntervalMs) { + this.servletContext = servletContext; + this.printThreadDumpIntervalMs = printThreadDumpIntervalMs; + this.clock = Clock.systemUTC(); + } + @GET public String checkStatus() { - String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); - @SuppressWarnings("unchecked") - Supplier isReadyProbe = (Supplier) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE); + // Locking classes to avoid deadlock detection in multi-thread concurrent requests. + synchronized (VipStatus.class) { + if (clock.millis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) { + if (lastCheckStatusResult) { + return "OK"; + } else { + throw new WebApplicationException(Status.SERVICE_UNAVAILABLE); + } + } + lastCheckStatusTimestamp = clock.millis(); - boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true; + String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); + @SuppressWarnings("unchecked") + Supplier isReadyProbe = (Supplier) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE); + boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true; - if (statusFilePath != null) { - File statusFile = new File(statusFilePath); - if (isReady && statusFile.exists() && statusFile.isFile()) { - return "OK"; + if (statusFilePath != null) { + File statusFile = new File(statusFilePath); + if (isReady && statusFile.exists() && statusFile.isFile()) { + // check deadlock + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + long[] threadIds = threadBean.findDeadlockedThreads(); + if (threadIds != null && threadIds.length > 0) { + ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false, + false); + String threadNames = Arrays.stream(threadInfos) + .map(threadInfo -> threadInfo.getThreadName() + + "(tid=" + threadInfo.getThreadId() + ")") + .collect(Collectors.joining(", ")); + if (clock.millis() - lastPrintThreadDumpTimestamp > printThreadDumpIntervalMs) { + String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); + log.error("Deadlock detected, service may be unavailable, " + + "thread stack details are as follows: {}.", diagnosticResult); + lastPrintThreadDumpTimestamp = clock.millis(); + } else { + log.error("Deadlocked threads detected. {}", threadNames); + } + lastCheckStatusResult = false; + throw new WebApplicationException(Status.SERVICE_UNAVAILABLE); + } else { + lastCheckStatusResult = true; + return "OK"; + } + } } + lastCheckStatusResult = false; + log.warn("Failed to access \"status.html\". The service is not ready"); + throw new WebApplicationException(Status.NOT_FOUND); } - log.warn("Failed to access \"status.html\". The service is not ready"); - throw new WebApplicationException(Status.NOT_FOUND); } - -} +} \ No newline at end of file diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java new file mode 100644 index 0000000000000..d98af5d2483ec --- /dev/null +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.configuration; + +import static org.testng.Assert.assertEquals; +import java.io.File; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; +import javax.servlet.ServletContext; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.mockito.Mockito; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +@Slf4j +public class VipStatusTest { + + public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath"; + public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe"; + private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 10000L; + // Rate limit status checks to every 500ms to prevent DoS + private static final long CHECK_STATUS_INTERVAL = 500L; + + private ServletContext mockServletContext; + private VipStatus vipStatus; + + @BeforeTest + public void setup() throws IOException { + String statusFilePath = "/tmp/status.html"; + File file = new File(statusFilePath); + file.createNewFile(); + Supplier isReadyProbe = () -> true; + + mockServletContext = Mockito.mock(ServletContext.class); + Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH)).thenReturn(statusFilePath); + Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE)).thenReturn(isReadyProbe); + + vipStatus = new VipStatus(mockServletContext, LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED); + } + + @Test + public void testVipStatusCheckStatus() { + // No deadlocks + testVipStatusCheckStatusWithoutDeadlock(); + // There is a deadlock + testVipStatusCheckStatusWithDeadlock(); + } + + @AfterTest + public void release() throws IOException { + String statusFilePath = "/tmp/status.html"; + File file = new File(statusFilePath); + file.deleteOnExit(); + } + + public void testVipStatusCheckStatusWithoutDeadlock() { + assertEquals(vipStatus.checkStatus(), "OK"); + } + + public void testVipStatusCheckStatusWithDeadlock() { + MockDeadlock.startDeadlock(); + boolean asExpected = true; + try { + vipStatus.checkStatus(); + asExpected = false; + System.out.println("Simulated deadlock, no deadlock detected, not as expected."); + } catch (Exception wae) { + System.out.println("Simulated deadlock and detected it, as expected."); + } finally { + MockDeadlock.executorService.shutdownNow(); + } + + if (!asExpected) { + throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE); + } + } + + public class MockDeadlock { + private static ExecutorService executorService = Executors.newCachedThreadPool(); + private static ReentrantLock lockA = new ReentrantLock(); + private static ReentrantLock lockB = new ReentrantLock(); + + @SneakyThrows + public static void startDeadlock() { + executorService.execute(new ThreadOne()); + executorService.execute(new ThreadTwo()); + Thread.sleep(CHECK_STATUS_INTERVAL); + } + + private static class ThreadOne implements Runnable { + @Override + public void run() { + try { + lockA.lock(); + System.out.println("ThreadOne acquired lockA"); + Thread.sleep(100); + while (!lockB.tryLock(1, TimeUnit.SECONDS)) { + System.out.println("ThreadOne acquired lockB"); + } + } catch (InterruptedException e) { + //e.printStackTrace(); + } finally { + lockA.unlock(); + } + } + } + + private static class ThreadTwo implements Runnable { + @Override + public void run() { + try { + lockB.lock(); + System.out.println("ThreadOne acquired lockB"); + Thread.sleep(100); + while (!lockA.tryLock(1, TimeUnit.SECONDS)) { + System.out.println("ThreadOne acquired lockA"); + } + } catch (InterruptedException e) { + //e.printStackTrace(); + } finally { + lockB.unlock(); + } + } + } + } +}