From e201bfcbe8d9e163b3a2bba05c4e51d20663f618 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Thu, 12 Dec 2024 11:19:50 +0800 Subject: [PATCH 1/7] HADOOP-19362. RPC metrics should be updated correctly when call is defered. --- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 27 +++++++--- .../apache/hadoop/ipc/ProtobufRpcEngine2.java | 26 +++++++--- .../java/org/apache/hadoop/ipc/Server.java | 52 +++++++++++++++++-- 3 files changed, 86 insertions(+), 19 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index df0f734d08016..dae7a2db52835 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicy; @@ -34,7 +35,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.tracing.TraceScope; import org.apache.hadoop.tracing.Tracer; import org.apache.hadoop.util.Time; @@ -393,28 +393,39 @@ static class ProtobufRpcEngineCallbackImpl private final RPC.Server server; private final Call call; private final String methodName; - private final long setupTime; + private final long callStartNanos; public ProtobufRpcEngineCallbackImpl() { this.server = CURRENT_CALL_INFO.get().getServer(); this.call = Server.getCurCall().get(); this.methodName = CURRENT_CALL_INFO.get().getMethodName(); - this.setupTime = Time.now(); + this.callStartNanos = Server.getCurCallStartNanos().get(); + } + + private void updateProcessingDetails(Call call, long deltaNanos) { + ProcessingDetails details = call.getProcessingDetails(); + call.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS); + deltaNanos -= details.get(ProcessingDetails.Timing.LOCKWAIT, TimeUnit.NANOSECONDS); + deltaNanos -= details.get(ProcessingDetails.Timing.LOCKSHARED, TimeUnit.NANOSECONDS); + deltaNanos -= details.get(ProcessingDetails.Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS); + details.set(ProcessingDetails.Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS); } @Override public void setResponse(Message message) { - long processingTime = Time.now() - setupTime; + long deltaNanos = Time.monotonicNowNanos() - callStartNanos; + updateProcessingDetails(call, deltaNanos); call.setDeferredResponse(RpcWritable.wrap(message)); - server.updateDeferredMetrics(methodName, processingTime); + server.updateDeferredMetrics(call, methodName, deltaNanos); } @Override public void error(Throwable t) { - long processingTime = Time.now() - setupTime; - String detailedMetricsName = t.getClass().getSimpleName(); - server.updateDeferredMetrics(detailedMetricsName, processingTime); + long deltaNanos = Time.monotonicNowNanos() - callStartNanos; + updateProcessingDetails(call, deltaNanos); call.setDeferredError(t); + String detailedMetricsName = t.getClass().getSimpleName(); + server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java index bedecc8851d6a..fdfc26e4cb011 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java @@ -25,6 +25,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.ProcessingDetails.Timing; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngine2Protos.RequestHeaderProto; import org.apache.hadoop.security.UserGroupInformation; @@ -425,28 +426,39 @@ static class ProtobufRpcEngineCallbackImpl private final RPC.Server server; private final Call call; private final String methodName; - private final long setupTime; + private final long callStartNanos; ProtobufRpcEngineCallbackImpl() { this.server = CURRENT_CALL_INFO.get().getServer(); this.call = Server.getCurCall().get(); this.methodName = CURRENT_CALL_INFO.get().getMethodName(); - this.setupTime = Time.now(); + this.callStartNanos = Server.getCurCallStartNanos().get(); } + private void updateProcessingDetails(Call call, long deltaNanos) { + ProcessingDetails details = call.getProcessingDetails(); + call.getProcessingDetails().set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS); + deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS); + deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS); + deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS); + details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS); + } + @Override public void setResponse(Message message) { - long processingTime = Time.now() - setupTime; + long deltaNanos = Time.monotonicNowNanos() - callStartNanos; + updateProcessingDetails(call, deltaNanos); call.setDeferredResponse(RpcWritable.wrap(message)); - server.updateDeferredMetrics(methodName, processingTime); + server.updateDeferredMetrics(call, methodName, deltaNanos); } @Override public void error(Throwable t) { - long processingTime = Time.now() - setupTime; - String detailedMetricsName = t.getClass().getSimpleName(); - server.updateDeferredMetrics(detailedMetricsName, processingTime); + long deltaNanos = Time.monotonicNowNanos() - callStartNanos; + updateProcessingDetails(call, deltaNanos); call.setDeferredError(t); + String detailedMetricsName = t.getClass().getSimpleName(); + server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 7303f8afce194..4ef44fb5e9183 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -351,13 +351,19 @@ public static Server get() { * after the call returns. */ private static final ThreadLocal CurCall = new ThreadLocal(); - + + private static final ThreadLocal CurCallStartNanos = new ThreadLocal(); + /** @return Get the current call. */ @VisibleForTesting public static ThreadLocal getCurCall() { return CurCall; } - + + public static ThreadLocal getCurCallStartNanos() { + return CurCallStartNanos; + } + /** * Returns the currently active RPC call's sequential ID number. A negative * call ID indicates an invalid value, such as if there is no currently active @@ -638,7 +644,8 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped rpcMetrics.addRpcQueueTime(queueTime); if (call.isResponseDeferred() || connDropped) { - // call was skipped; don't include it in processing metrics + // The call was skipped; don't include it in processing metrics. + // Will update metrics in method updateDeferredMetrics. return; } @@ -668,9 +675,41 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped } } - void updateDeferredMetrics(String name, long processingTime) { + /** + * Update rpc metrics for defered calls. + * @param call The Rpc Call + * @param name Rpc method name + * @param processingTime processing call in ms unit. + */ + void updateDeferredMetrics(Call call, String name, long processingTime) { + long completionTimeNanos = Time.monotonicNowNanos(); + long arrivalTimeNanos = call.timestampNanos; + + ProcessingDetails details = call.getProcessingDetails(); + long waitTime = + details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit()); + long responseTime = + details.get(Timing.RESPONSE, rpcMetrics.getMetricsTimeUnit()); + rpcMetrics.addRpcLockWaitTime(waitTime); + rpcMetrics.addRpcProcessingTime(processingTime); + rpcMetrics.addRpcResponseTime(responseTime); rpcMetrics.addDeferredRpcProcessingTime(processingTime); rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime); + // don't include lock wait for detailed metrics. + processingTime -= waitTime; + rpcDetailedMetrics.addProcessingTime(name, processingTime); + + // Overall processing time is from arrival to completion. + long overallProcessingTime = rpcMetrics.getMetricsTimeUnit() + .convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS); + rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime); + callQueue.addResponseTime(name, call, details); + if (isLogSlowRPC()) { + logSlowRpcCalls(name, call, details); + } + if (details.getReturnStatus() == RpcStatusProto.SUCCESS) { + rpcMetrics.incrRpcCallSuccesses(); + } } /** @@ -1243,6 +1282,7 @@ public Void run() throws Exception { } long startNanos = Time.monotonicNowNanos(); + CurCallStartNanos.set(startNanos); Writable value = null; ResponseParams responseParams = new ResponseParams(); @@ -1331,6 +1371,7 @@ void doResponse(Throwable t, RpcStatusProto status) throws IOException { * Send a deferred response, ignoring errors. */ private void sendDeferedResponse() { + long startNanos = Time.monotonicNowNanos(); try { connection.sendResponse(this); } catch (Exception e) { @@ -1342,6 +1383,8 @@ private void sendDeferedResponse() { .currentThread().getName() + ", CallId=" + callId + ", hostname=" + getHostAddress()); } + getProcessingDetails().set(Timing.RESPONSE, + Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS); } @Override @@ -3220,6 +3263,7 @@ public void run() { } } finally { CurCall.set(null); + CurCallStartNanos.set(null); numInProcessHandler.decrementAndGet(); IOUtils.cleanupWithLogger(LOG, traceScope); if (call != null) { From 9170fad51f7ffd4e1034466d71026b4b36394f55 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Thu, 12 Dec 2024 13:51:50 +0800 Subject: [PATCH 2/7] fix checkstyle. --- .../java/org/apache/hadoop/ipc/ProtobufRpcEngine.java | 5 +++-- .../java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java | 10 +++++----- .../src/main/java/org/apache/hadoop/ipc/Server.java | 10 +++++----- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index dae7a2db52835..c1c28fcfc5949 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -399,12 +399,13 @@ public ProtobufRpcEngineCallbackImpl() { this.server = CURRENT_CALL_INFO.get().getServer(); this.call = Server.getCurCall().get(); this.methodName = CURRENT_CALL_INFO.get().getMethodName(); - this.callStartNanos = Server.getCurCallStartNanos().get(); + this.callStartNanos = Server.getCurCallStartnanos().get(); } private void updateProcessingDetails(Call call, long deltaNanos) { ProcessingDetails details = call.getProcessingDetails(); - call.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS); + call.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos, + TimeUnit.NANOSECONDS); deltaNanos -= details.get(ProcessingDetails.Timing.LOCKWAIT, TimeUnit.NANOSECONDS); deltaNanos -= details.get(ProcessingDetails.Timing.LOCKSHARED, TimeUnit.NANOSECONDS); deltaNanos -= details.get(ProcessingDetails.Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java index fdfc26e4cb011..d0e014b3e1306 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java @@ -432,18 +432,18 @@ static class ProtobufRpcEngineCallbackImpl this.server = CURRENT_CALL_INFO.get().getServer(); this.call = Server.getCurCall().get(); this.methodName = CURRENT_CALL_INFO.get().getMethodName(); - this.callStartNanos = Server.getCurCallStartNanos().get(); + this.callStartNanos = Server.getCurCallStartnanos().get(); } - private void updateProcessingDetails(Call call, long deltaNanos) { - ProcessingDetails details = call.getProcessingDetails(); - call.getProcessingDetails().set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS); + private void updateProcessingDetails(Call rpcCall, long deltaNanos) { + ProcessingDetails details = rpcCall.getProcessingDetails(); + rpcCall.getProcessingDetails().set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS); deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS); deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS); deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS); details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS); } - + @Override public void setResponse(Message message) { long deltaNanos = Time.monotonicNowNanos() - callStartNanos; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 4ef44fb5e9183..7ee315f7da062 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -352,7 +352,7 @@ public static Server get() { */ private static final ThreadLocal CurCall = new ThreadLocal(); - private static final ThreadLocal CurCallStartNanos = new ThreadLocal(); + private static final ThreadLocal CUR_CALL_STARTNANOS = new ThreadLocal(); /** @return Get the current call. */ @VisibleForTesting @@ -360,8 +360,8 @@ public static ThreadLocal getCurCall() { return CurCall; } - public static ThreadLocal getCurCallStartNanos() { - return CurCallStartNanos; + public static ThreadLocal getCurCallStartnanos() { + return CUR_CALL_STARTNANOS; } /** @@ -1282,7 +1282,7 @@ public Void run() throws Exception { } long startNanos = Time.monotonicNowNanos(); - CurCallStartNanos.set(startNanos); + CUR_CALL_STARTNANOS.set(startNanos); Writable value = null; ResponseParams responseParams = new ResponseParams(); @@ -3263,7 +3263,7 @@ public void run() { } } finally { CurCall.set(null); - CurCallStartNanos.set(null); + CUR_CALL_STARTNANOS.set(null); numInProcessHandler.decrementAndGet(); IOUtils.cleanupWithLogger(LOG, traceScope); if (call != null) { From eaef2a1479b41229c982555540912341c49fe957 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Thu, 12 Dec 2024 16:26:08 +0800 Subject: [PATCH 3/7] add Ut --- .../ipc/TestProtoBufRpcServerHandoff.java | 61 +++++++++++++++++-- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java index 4328655270921..aadc116ac4dfb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ipc; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -26,6 +27,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.thirdparty.protobuf.BlockingService; import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.ServiceException; @@ -33,18 +35,28 @@ import org.apache.hadoop.ipc.protobuf.TestProtos; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcHandoffProto; import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + public class TestProtoBufRpcServerHandoff { public static final Logger LOG = LoggerFactory.getLogger(TestProtoBufRpcServerHandoff.class); - @Test(timeout = 20000) - public void test() throws Exception { - Configuration conf = new Configuration(); + private static Configuration conf = null; + private static RPC.Server server = null; + private static InetSocketAddress address = null; + + @Before + public void setUp() throws IOException { + conf = new Configuration(); TestProtoBufRpcServerHandoffServer serverImpl = new TestProtoBufRpcServerHandoffServer(); @@ -53,7 +65,7 @@ public void test() throws Exception { RPC.setProtocolEngine(conf, TestProtoBufRpcServerHandoffProtocol.class, ProtobufRpcEngine2.class); - RPC.Server server = new RPC.Builder(conf) + server = new RPC.Builder(conf) .setProtocol(TestProtoBufRpcServerHandoffProtocol.class) .setInstance(blockingService) .setVerbose(true) @@ -61,10 +73,13 @@ public void test() throws Exception { .build(); server.start(); - InetSocketAddress address = server.getListenerAddress(); + address = server.getListenerAddress(); long serverStartTime = System.currentTimeMillis(); LOG.info("Server started at: " + address + " at time: " + serverStartTime); - + } + + @Test(timeout = 20000) + public void test() throws Exception { final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy( TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf); @@ -93,6 +108,40 @@ public void test() throws Exception { } + @Test(timeout = 20000) + public void testHandoffMetrics() throws Exception { + final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy( + TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf); + + ExecutorService executorService = Executors.newFixedThreadPool(2); + CompletionService completionService = + new ExecutorCompletionService( + executorService); + + completionService.submit(new ClientInvocationCallable(client, 5000L)); + completionService.submit(new ClientInvocationCallable(client, 5000L)); + + long submitTime = System.currentTimeMillis(); + Future future1 = completionService.take(); + Future future2 = completionService.take(); + + ClientInvocationCallable callable1 = future1.get(); + ClientInvocationCallable callable2 = future2.get(); + + LOG.info(callable1.toString()); + LOG.info(callable2.toString()); + + // Ensure the 5 second sleep responses are within a reasonable time of each + // other. + Assert.assertTrue(Math.abs(callable1.endTime - callable2.endTime) < 2000L); + Assert.assertTrue(System.currentTimeMillis() - submitTime < 7000L); + + // Check rpcMetrics + MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name()); + assertCounterGt("DeferredRpcProcessingTimeNumOps", 1L, rb); + assertCounter("RpcProcessingTimeNumOps", 2L, rb); + } + private static class ClientInvocationCallable implements Callable { final TestProtoBufRpcServerHandoffProtocol client; From 238f2dd096fd16ab456818d1f08b9b9984fd625f Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Thu, 12 Dec 2024 16:28:02 +0800 Subject: [PATCH 4/7] remove unused import --- .../java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java index aadc116ac4dfb..5b561665548b9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java @@ -36,7 +36,6 @@ import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcHandoffProto; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From d425138d871201b25535ef908282f8790f655a7b Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Thu, 12 Dec 2024 17:55:44 +0800 Subject: [PATCH 5/7] fix checkstyle. --- .../main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index c1c28fcfc5949..3850d33f5121c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -402,9 +402,9 @@ public ProtobufRpcEngineCallbackImpl() { this.callStartNanos = Server.getCurCallStartnanos().get(); } - private void updateProcessingDetails(Call call, long deltaNanos) { - ProcessingDetails details = call.getProcessingDetails(); - call.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos, + private void updateProcessingDetails(Call rpcCall, long deltaNanos) { + ProcessingDetails details = rpcCall.getProcessingDetails(); + rpcCall.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS); deltaNanos -= details.get(ProcessingDetails.Timing.LOCKWAIT, TimeUnit.NANOSECONDS); deltaNanos -= details.get(ProcessingDetails.Timing.LOCKSHARED, TimeUnit.NANOSECONDS); From 4900b0d0bdd1eea6d997e3fb9c48a0593e58548b Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Fri, 13 Dec 2024 11:03:30 +0800 Subject: [PATCH 6/7] make start run timestamp as call's field. --- .../org/apache/hadoop/ipc/ProtobufRpcEngine2.java | 6 ++---- .../src/main/java/org/apache/hadoop/ipc/Server.java | 12 +++++++++++- .../hadoop/ipc/TestProtoBufRpcServerHandoff.java | 2 +- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java index d0e014b3e1306..f60590ec38ea8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java @@ -426,13 +426,11 @@ static class ProtobufRpcEngineCallbackImpl private final RPC.Server server; private final Call call; private final String methodName; - private final long callStartNanos; ProtobufRpcEngineCallbackImpl() { this.server = CURRENT_CALL_INFO.get().getServer(); this.call = Server.getCurCall().get(); this.methodName = CURRENT_CALL_INFO.get().getMethodName(); - this.callStartNanos = Server.getCurCallStartnanos().get(); } private void updateProcessingDetails(Call rpcCall, long deltaNanos) { @@ -446,7 +444,7 @@ private void updateProcessingDetails(Call rpcCall, long deltaNanos) { @Override public void setResponse(Message message) { - long deltaNanos = Time.monotonicNowNanos() - callStartNanos; + long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos(); updateProcessingDetails(call, deltaNanos); call.setDeferredResponse(RpcWritable.wrap(message)); server.updateDeferredMetrics(call, methodName, deltaNanos); @@ -454,7 +452,7 @@ public void setResponse(Message message) { @Override public void error(Throwable t) { - long deltaNanos = Time.monotonicNowNanos() - callStartNanos; + long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos(); updateProcessingDetails(call, deltaNanos); call.setDeferredError(t); String detailedMetricsName = t.getClass().getSimpleName(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 7ee315f7da062..e432bac3b2b97 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -1002,6 +1002,7 @@ public static class Call implements Schedulable, final int callId; // the client's call id final int retryCount; // the retry count of the call private final long timestampNanos; // time the call was received + protected long startHandleTimestampNanos; // time the call was run long responseTimestampNanos; // time the call was served private AtomicInteger responseWaitCount = new AtomicInteger(1); final RPC.RpcKind rpcKind; @@ -1206,6 +1207,15 @@ public void setDeferredError(Throwable t) { public long getTimestampNanos() { return timestampNanos; } + + + public long getStartHandleTimestampNanos() { + return startHandleTimestampNanos; + } + + public void setStartHandleTimestampNanos(long startHandleTimestampNanos) { + this.startHandleTimestampNanos = startHandleTimestampNanos; + } } /** A RPC extended call queued for handling. */ @@ -1282,7 +1292,7 @@ public Void run() throws Exception { } long startNanos = Time.monotonicNowNanos(); - CUR_CALL_STARTNANOS.set(startNanos); + this.setStartHandleTimestampNanos(startNanos); Writable value = null; ResponseParams responseParams = new ResponseParams(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java index 5b561665548b9..0ae2d37d1ad1f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java @@ -76,7 +76,7 @@ public void setUp() throws IOException { long serverStartTime = System.currentTimeMillis(); LOG.info("Server started at: " + address + " at time: " + serverStartTime); } - + @Test(timeout = 20000) public void test() throws Exception { final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy( From f8b9f0d6474e8ddd894008f2862e7822aad684bd Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Fri, 13 Dec 2024 15:27:15 +0800 Subject: [PATCH 7/7] remove CUR_CALL_STARTNANOS --- .../main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java | 6 ++---- .../src/main/java/org/apache/hadoop/ipc/Server.java | 7 ------- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 3850d33f5121c..239437854dda5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -393,13 +393,11 @@ static class ProtobufRpcEngineCallbackImpl private final RPC.Server server; private final Call call; private final String methodName; - private final long callStartNanos; public ProtobufRpcEngineCallbackImpl() { this.server = CURRENT_CALL_INFO.get().getServer(); this.call = Server.getCurCall().get(); this.methodName = CURRENT_CALL_INFO.get().getMethodName(); - this.callStartNanos = Server.getCurCallStartnanos().get(); } private void updateProcessingDetails(Call rpcCall, long deltaNanos) { @@ -414,7 +412,7 @@ private void updateProcessingDetails(Call rpcCall, long deltaNanos) { @Override public void setResponse(Message message) { - long deltaNanos = Time.monotonicNowNanos() - callStartNanos; + long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos(); updateProcessingDetails(call, deltaNanos); call.setDeferredResponse(RpcWritable.wrap(message)); server.updateDeferredMetrics(call, methodName, deltaNanos); @@ -422,7 +420,7 @@ public void setResponse(Message message) { @Override public void error(Throwable t) { - long deltaNanos = Time.monotonicNowNanos() - callStartNanos; + long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos(); updateProcessingDetails(call, deltaNanos); call.setDeferredError(t); String detailedMetricsName = t.getClass().getSimpleName(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index e432bac3b2b97..0dc438cca3ea7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -352,18 +352,12 @@ public static Server get() { */ private static final ThreadLocal CurCall = new ThreadLocal(); - private static final ThreadLocal CUR_CALL_STARTNANOS = new ThreadLocal(); - /** @return Get the current call. */ @VisibleForTesting public static ThreadLocal getCurCall() { return CurCall; } - public static ThreadLocal getCurCallStartnanos() { - return CUR_CALL_STARTNANOS; - } - /** * Returns the currently active RPC call's sequential ID number. A negative * call ID indicates an invalid value, such as if there is no currently active @@ -3273,7 +3267,6 @@ public void run() { } } finally { CurCall.set(null); - CUR_CALL_STARTNANOS.set(null); numInProcessHandler.decrementAndGet(); IOUtils.cleanupWithLogger(LOG, traceScope); if (call != null) {