Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19362. RPC metrics should be updated correctly when call is defered. #7224

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
Expand All @@ -34,7 +35,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -393,28 +393,38 @@ static class ProtobufRpcEngineCallbackImpl
private final RPC.Server server;
private final Call call;
private final String methodName;
private final long setupTime;

public ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
}

private void updateProcessingDetails(Call rpcCall, long deltaNanos) {
ProcessingDetails details = rpcCall.getProcessingDetails();
rpcCall.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos,
TimeUnit.NANOSECONDS);
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
details.set(ProcessingDetails.Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
}

@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
updateProcessingDetails(call, deltaNanos);
call.setDeferredResponse(RpcWritable.wrap(message));
server.updateDeferredMetrics(methodName, processingTime);
server.updateDeferredMetrics(call, methodName, deltaNanos);
}

@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
updateProcessingDetails(call, deltaNanos);
call.setDeferredError(t);
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.ProcessingDetails.Timing;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngine2Protos.RequestHeaderProto;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -425,28 +426,37 @@ static class ProtobufRpcEngineCallbackImpl
private final RPC.Server server;
private final Call call;
private final String methodName;
private final long setupTime;

ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
}

private void updateProcessingDetails(Call rpcCall, long deltaNanos) {
ProcessingDetails details = rpcCall.getProcessingDetails();
rpcCall.getProcessingDetails().set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
}

@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
updateProcessingDetails(call, deltaNanos);
call.setDeferredResponse(RpcWritable.wrap(message));
server.updateDeferredMetrics(methodName, processingTime);
server.updateDeferredMetrics(call, methodName, deltaNanos);
}

@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
updateProcessingDetails(call, deltaNanos);
call.setDeferredError(t);
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,13 @@ public static Server get() {
* after the call returns.
*/
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();

/** @return Get the current call. */
@VisibleForTesting
public static ThreadLocal<Call> getCurCall() {
return CurCall;
}

/**
* Returns the currently active RPC call's sequential ID number. A negative
* call ID indicates an invalid value, such as if there is no currently active
Expand Down Expand Up @@ -638,7 +638,8 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped
rpcMetrics.addRpcQueueTime(queueTime);

if (call.isResponseDeferred() || connDropped) {
// call was skipped; don't include it in processing metrics
// The call was skipped; don't include it in processing metrics.
// Will update metrics in method updateDeferredMetrics.
return;
}

Expand Down Expand Up @@ -668,9 +669,41 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped
}
}

void updateDeferredMetrics(String name, long processingTime) {
/**
* Update rpc metrics for defered calls.
* @param call The Rpc Call
* @param name Rpc method name
* @param processingTime processing call in ms unit.
*/
void updateDeferredMetrics(Call call, String name, long processingTime) {
long completionTimeNanos = Time.monotonicNowNanos();
long arrivalTimeNanos = call.timestampNanos;

ProcessingDetails details = call.getProcessingDetails();
long waitTime =
details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
long responseTime =
details.get(Timing.RESPONSE, rpcMetrics.getMetricsTimeUnit());
rpcMetrics.addRpcLockWaitTime(waitTime);
rpcMetrics.addRpcProcessingTime(processingTime);
rpcMetrics.addRpcResponseTime(responseTime);
rpcMetrics.addDeferredRpcProcessingTime(processingTime);
rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime);
// don't include lock wait for detailed metrics.
processingTime -= waitTime;
rpcDetailedMetrics.addProcessingTime(name, processingTime);

// Overall processing time is from arrival to completion.
long overallProcessingTime = rpcMetrics.getMetricsTimeUnit()
.convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS);
rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime);
callQueue.addResponseTime(name, call, details);
if (isLogSlowRPC()) {
logSlowRpcCalls(name, call, details);
}
if (details.getReturnStatus() == RpcStatusProto.SUCCESS) {
rpcMetrics.incrRpcCallSuccesses();
}
Comment on lines +677 to +706
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some duplicate code in this method and updateMetrics; we can extract the common code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KeeProMise Sir, thanks for your reviewing. This is a good suggestion, but i am worrying about the code readability will not be good. What's your opinion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hfutatzhanghb got it.

}

/**
Expand Down Expand Up @@ -963,6 +996,7 @@ public static class Call implements Schedulable,
final int callId; // the client's call id
final int retryCount; // the retry count of the call
private final long timestampNanos; // time the call was received
protected long startHandleTimestampNanos; // time the call was run
long responseTimestampNanos; // time the call was served
private AtomicInteger responseWaitCount = new AtomicInteger(1);
final RPC.RpcKind rpcKind;
Expand Down Expand Up @@ -1167,6 +1201,15 @@ public void setDeferredError(Throwable t) {
public long getTimestampNanos() {
return timestampNanos;
}


public long getStartHandleTimestampNanos() {
return startHandleTimestampNanos;
}

public void setStartHandleTimestampNanos(long startHandleTimestampNanos) {
this.startHandleTimestampNanos = startHandleTimestampNanos;
}
}

/** A RPC extended call queued for handling. */
Expand Down Expand Up @@ -1243,6 +1286,7 @@ public Void run() throws Exception {
}

long startNanos = Time.monotonicNowNanos();
this.setStartHandleTimestampNanos(startNanos);
Writable value = null;
ResponseParams responseParams = new ResponseParams();

Expand Down Expand Up @@ -1331,6 +1375,7 @@ void doResponse(Throwable t, RpcStatusProto status) throws IOException {
* Send a deferred response, ignoring errors.
*/
private void sendDeferedResponse() {
long startNanos = Time.monotonicNowNanos();
try {
connection.sendResponse(this);
} catch (Exception e) {
Expand All @@ -1342,6 +1387,8 @@ private void sendDeferedResponse() {
.currentThread().getName() + ", CallId="
+ callId + ", hostname=" + getHostAddress());
}
getProcessingDetails().set(Timing.RESPONSE,
Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
Expand All @@ -26,25 +27,35 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcHandoffProto;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;

public class TestProtoBufRpcServerHandoff {

public static final Logger LOG =
LoggerFactory.getLogger(TestProtoBufRpcServerHandoff.class);

@Test(timeout = 20000)
public void test() throws Exception {
Configuration conf = new Configuration();
private static Configuration conf = null;
private static RPC.Server server = null;
private static InetSocketAddress address = null;

@Before
public void setUp() throws IOException {
conf = new Configuration();

TestProtoBufRpcServerHandoffServer serverImpl =
new TestProtoBufRpcServerHandoffServer();
Expand All @@ -53,18 +64,21 @@ public void test() throws Exception {

RPC.setProtocolEngine(conf, TestProtoBufRpcServerHandoffProtocol.class,
ProtobufRpcEngine2.class);
RPC.Server server = new RPC.Builder(conf)
server = new RPC.Builder(conf)
.setProtocol(TestProtoBufRpcServerHandoffProtocol.class)
.setInstance(blockingService)
.setVerbose(true)
.setNumHandlers(1) // Num Handlers explicitly set to 1 for test.
.build();
server.start();

InetSocketAddress address = server.getListenerAddress();
address = server.getListenerAddress();
long serverStartTime = System.currentTimeMillis();
LOG.info("Server started at: " + address + " at time: " + serverStartTime);
}

@Test(timeout = 20000)
public void test() throws Exception {
final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy(
TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf);

Expand Down Expand Up @@ -93,6 +107,40 @@ public void test() throws Exception {

}

@Test(timeout = 20000)
public void testHandoffMetrics() throws Exception {
final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy(
TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf);

ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletionService<ClientInvocationCallable> completionService =
new ExecutorCompletionService<ClientInvocationCallable>(
executorService);

completionService.submit(new ClientInvocationCallable(client, 5000L));
completionService.submit(new ClientInvocationCallable(client, 5000L));

long submitTime = System.currentTimeMillis();
Future<ClientInvocationCallable> future1 = completionService.take();
Future<ClientInvocationCallable> future2 = completionService.take();

ClientInvocationCallable callable1 = future1.get();
ClientInvocationCallable callable2 = future2.get();

LOG.info(callable1.toString());
LOG.info(callable2.toString());

// Ensure the 5 second sleep responses are within a reasonable time of each
// other.
Assert.assertTrue(Math.abs(callable1.endTime - callable2.endTime) < 2000L);
Assert.assertTrue(System.currentTimeMillis() - submitTime < 7000L);

// Check rpcMetrics
MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name());
assertCounterGt("DeferredRpcProcessingTimeNumOps", 1L, rb);
assertCounter("RpcProcessingTimeNumOps", 2L, rb);
}

private static class ClientInvocationCallable
implements Callable<ClientInvocationCallable> {
final TestProtoBufRpcServerHandoffProtocol client;
Expand Down
Loading