diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index df0f734d08016..239437854dda5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicy; @@ -34,7 +35,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.tracing.TraceScope; import org.apache.hadoop.tracing.Tracer; import org.apache.hadoop.util.Time; @@ -393,28 +393,38 @@ static class ProtobufRpcEngineCallbackImpl private final RPC.Server server; private final Call call; private final String methodName; - private final long setupTime; public ProtobufRpcEngineCallbackImpl() { this.server = CURRENT_CALL_INFO.get().getServer(); this.call = Server.getCurCall().get(); this.methodName = CURRENT_CALL_INFO.get().getMethodName(); - this.setupTime = Time.now(); + } + + private void updateProcessingDetails(Call rpcCall, long deltaNanos) { + ProcessingDetails details = rpcCall.getProcessingDetails(); + rpcCall.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos, + TimeUnit.NANOSECONDS); + deltaNanos -= details.get(ProcessingDetails.Timing.LOCKWAIT, TimeUnit.NANOSECONDS); + deltaNanos -= details.get(ProcessingDetails.Timing.LOCKSHARED, TimeUnit.NANOSECONDS); + deltaNanos -= details.get(ProcessingDetails.Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS); + details.set(ProcessingDetails.Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS); } @Override public void setResponse(Message message) { - long processingTime = Time.now() - setupTime; + long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos(); + updateProcessingDetails(call, deltaNanos); call.setDeferredResponse(RpcWritable.wrap(message)); - server.updateDeferredMetrics(methodName, processingTime); + server.updateDeferredMetrics(call, methodName, deltaNanos); } @Override public void error(Throwable t) { - long processingTime = Time.now() - setupTime; - String detailedMetricsName = t.getClass().getSimpleName(); - server.updateDeferredMetrics(detailedMetricsName, processingTime); + long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos(); + updateProcessingDetails(call, deltaNanos); call.setDeferredError(t); + String detailedMetricsName = t.getClass().getSimpleName(); + server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java index bedecc8851d6a..f60590ec38ea8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java @@ -25,6 +25,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.ProcessingDetails.Timing; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngine2Protos.RequestHeaderProto; import org.apache.hadoop.security.UserGroupInformation; @@ -425,28 +426,37 @@ static class ProtobufRpcEngineCallbackImpl private final RPC.Server server; private final Call call; private final String methodName; - private final long setupTime; ProtobufRpcEngineCallbackImpl() { this.server = CURRENT_CALL_INFO.get().getServer(); this.call = Server.getCurCall().get(); this.methodName = CURRENT_CALL_INFO.get().getMethodName(); - this.setupTime = Time.now(); + } + + private void updateProcessingDetails(Call rpcCall, long deltaNanos) { + ProcessingDetails details = rpcCall.getProcessingDetails(); + rpcCall.getProcessingDetails().set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS); + deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS); + deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS); + deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS); + details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS); } @Override public void setResponse(Message message) { - long processingTime = Time.now() - setupTime; + long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos(); + updateProcessingDetails(call, deltaNanos); call.setDeferredResponse(RpcWritable.wrap(message)); - server.updateDeferredMetrics(methodName, processingTime); + server.updateDeferredMetrics(call, methodName, deltaNanos); } @Override public void error(Throwable t) { - long processingTime = Time.now() - setupTime; - String detailedMetricsName = t.getClass().getSimpleName(); - server.updateDeferredMetrics(detailedMetricsName, processingTime); + long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos(); + updateProcessingDetails(call, deltaNanos); call.setDeferredError(t); + String detailedMetricsName = t.getClass().getSimpleName(); + server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 7303f8afce194..0dc438cca3ea7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -351,13 +351,13 @@ public static Server get() { * after the call returns. */ private static final ThreadLocal CurCall = new ThreadLocal(); - + /** @return Get the current call. */ @VisibleForTesting public static ThreadLocal getCurCall() { return CurCall; } - + /** * Returns the currently active RPC call's sequential ID number. A negative * call ID indicates an invalid value, such as if there is no currently active @@ -638,7 +638,8 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped rpcMetrics.addRpcQueueTime(queueTime); if (call.isResponseDeferred() || connDropped) { - // call was skipped; don't include it in processing metrics + // The call was skipped; don't include it in processing metrics. + // Will update metrics in method updateDeferredMetrics. return; } @@ -668,9 +669,41 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped } } - void updateDeferredMetrics(String name, long processingTime) { + /** + * Update rpc metrics for defered calls. + * @param call The Rpc Call + * @param name Rpc method name + * @param processingTime processing call in ms unit. + */ + void updateDeferredMetrics(Call call, String name, long processingTime) { + long completionTimeNanos = Time.monotonicNowNanos(); + long arrivalTimeNanos = call.timestampNanos; + + ProcessingDetails details = call.getProcessingDetails(); + long waitTime = + details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit()); + long responseTime = + details.get(Timing.RESPONSE, rpcMetrics.getMetricsTimeUnit()); + rpcMetrics.addRpcLockWaitTime(waitTime); + rpcMetrics.addRpcProcessingTime(processingTime); + rpcMetrics.addRpcResponseTime(responseTime); rpcMetrics.addDeferredRpcProcessingTime(processingTime); rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime); + // don't include lock wait for detailed metrics. + processingTime -= waitTime; + rpcDetailedMetrics.addProcessingTime(name, processingTime); + + // Overall processing time is from arrival to completion. + long overallProcessingTime = rpcMetrics.getMetricsTimeUnit() + .convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS); + rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime); + callQueue.addResponseTime(name, call, details); + if (isLogSlowRPC()) { + logSlowRpcCalls(name, call, details); + } + if (details.getReturnStatus() == RpcStatusProto.SUCCESS) { + rpcMetrics.incrRpcCallSuccesses(); + } } /** @@ -963,6 +996,7 @@ public static class Call implements Schedulable, final int callId; // the client's call id final int retryCount; // the retry count of the call private final long timestampNanos; // time the call was received + protected long startHandleTimestampNanos; // time the call was run long responseTimestampNanos; // time the call was served private AtomicInteger responseWaitCount = new AtomicInteger(1); final RPC.RpcKind rpcKind; @@ -1167,6 +1201,15 @@ public void setDeferredError(Throwable t) { public long getTimestampNanos() { return timestampNanos; } + + + public long getStartHandleTimestampNanos() { + return startHandleTimestampNanos; + } + + public void setStartHandleTimestampNanos(long startHandleTimestampNanos) { + this.startHandleTimestampNanos = startHandleTimestampNanos; + } } /** A RPC extended call queued for handling. */ @@ -1243,6 +1286,7 @@ public Void run() throws Exception { } long startNanos = Time.monotonicNowNanos(); + this.setStartHandleTimestampNanos(startNanos); Writable value = null; ResponseParams responseParams = new ResponseParams(); @@ -1331,6 +1375,7 @@ void doResponse(Throwable t, RpcStatusProto status) throws IOException { * Send a deferred response, ignoring errors. */ private void sendDeferedResponse() { + long startNanos = Time.monotonicNowNanos(); try { connection.sendResponse(this); } catch (Exception e) { @@ -1342,6 +1387,8 @@ private void sendDeferedResponse() { .currentThread().getName() + ", CallId=" + callId + ", hostname=" + getHostAddress()); } + getProcessingDetails().set(Timing.RESPONSE, + Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS); } @Override diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java index 4328655270921..0ae2d37d1ad1f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ipc; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -26,6 +27,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.thirdparty.protobuf.BlockingService; import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.ServiceException; @@ -33,18 +35,27 @@ import org.apache.hadoop.ipc.protobuf.TestProtos; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcHandoffProto; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + public class TestProtoBufRpcServerHandoff { public static final Logger LOG = LoggerFactory.getLogger(TestProtoBufRpcServerHandoff.class); - @Test(timeout = 20000) - public void test() throws Exception { - Configuration conf = new Configuration(); + private static Configuration conf = null; + private static RPC.Server server = null; + private static InetSocketAddress address = null; + + @Before + public void setUp() throws IOException { + conf = new Configuration(); TestProtoBufRpcServerHandoffServer serverImpl = new TestProtoBufRpcServerHandoffServer(); @@ -53,7 +64,7 @@ public void test() throws Exception { RPC.setProtocolEngine(conf, TestProtoBufRpcServerHandoffProtocol.class, ProtobufRpcEngine2.class); - RPC.Server server = new RPC.Builder(conf) + server = new RPC.Builder(conf) .setProtocol(TestProtoBufRpcServerHandoffProtocol.class) .setInstance(blockingService) .setVerbose(true) @@ -61,10 +72,13 @@ public void test() throws Exception { .build(); server.start(); - InetSocketAddress address = server.getListenerAddress(); + address = server.getListenerAddress(); long serverStartTime = System.currentTimeMillis(); LOG.info("Server started at: " + address + " at time: " + serverStartTime); + } + @Test(timeout = 20000) + public void test() throws Exception { final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy( TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf); @@ -93,6 +107,40 @@ public void test() throws Exception { } + @Test(timeout = 20000) + public void testHandoffMetrics() throws Exception { + final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy( + TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf); + + ExecutorService executorService = Executors.newFixedThreadPool(2); + CompletionService completionService = + new ExecutorCompletionService( + executorService); + + completionService.submit(new ClientInvocationCallable(client, 5000L)); + completionService.submit(new ClientInvocationCallable(client, 5000L)); + + long submitTime = System.currentTimeMillis(); + Future future1 = completionService.take(); + Future future2 = completionService.take(); + + ClientInvocationCallable callable1 = future1.get(); + ClientInvocationCallable callable2 = future2.get(); + + LOG.info(callable1.toString()); + LOG.info(callable2.toString()); + + // Ensure the 5 second sleep responses are within a reasonable time of each + // other. + Assert.assertTrue(Math.abs(callable1.endTime - callable2.endTime) < 2000L); + Assert.assertTrue(System.currentTimeMillis() - submitTime < 7000L); + + // Check rpcMetrics + MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name()); + assertCounterGt("DeferredRpcProcessingTimeNumOps", 1L, rb); + assertCounter("RpcProcessingTimeNumOps", 2L, rb); + } + private static class ClientInvocationCallable implements Callable { final TestProtoBufRpcServerHandoffProtocol client;