Skip to content

Commit

Permalink
HADOOP-19362. RPC metrics should be updated correctly when call is de…
Browse files Browse the repository at this point in the history
…fered.
  • Loading branch information
hfutatzhanghb committed Dec 12, 2024
1 parent efb83ec commit 9bee480
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
Expand All @@ -34,7 +35,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -393,28 +393,39 @@ static class ProtobufRpcEngineCallbackImpl
private final RPC.Server server;
private final Call call;
private final String methodName;
private final long setupTime;
private final long callStartNanos;

public ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
this.callStartNanos = Server.getCurCallStartNanos().get();
}

private void updateProcessingDetails(Call call, long deltaNanos) {
ProcessingDetails details = call.getProcessingDetails();
call.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
details.set(ProcessingDetails.Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
}

@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
long deltaNanos = Time.monotonicNowNanos() - callStartNanos;
updateProcessingDetails(call, deltaNanos);
call.setDeferredResponse(RpcWritable.wrap(message));
server.updateDeferredMetrics(methodName, processingTime);
server.updateDeferredMetrics(call, methodName, deltaNanos);
}

@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
long deltaNanos = Time.monotonicNowNanos() - callStartNanos;
updateProcessingDetails(call, deltaNanos);
call.setDeferredError(t);
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.ProcessingDetails.Timing;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngine2Protos.RequestHeaderProto;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -425,28 +426,39 @@ static class ProtobufRpcEngineCallbackImpl
private final RPC.Server server;
private final Call call;
private final String methodName;
private final long setupTime;
private final long callStartNanos;

ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
this.callStartNanos = Server.getCurCallStartNanos().get();
}

private void updateProcessingDetails(Call call, long deltaNanos) {
ProcessingDetails details = call.getProcessingDetails();
call.getProcessingDetails().set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
}

Check failure on line 446 in hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java#L446

blanks: end of line
@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
long deltaNanos = Time.monotonicNowNanos() - callStartNanos;
updateProcessingDetails(call, deltaNanos);
call.setDeferredResponse(RpcWritable.wrap(message));
server.updateDeferredMetrics(methodName, processingTime);
server.updateDeferredMetrics(call, methodName, deltaNanos);
}

@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
long deltaNanos = Time.monotonicNowNanos() - callStartNanos;
updateProcessingDetails(call, deltaNanos);
call.setDeferredError(t);
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,19 @@ public static Server get() {
* after the call returns.
*/
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();


private static final ThreadLocal<Long> CurCallStartNanos = new ThreadLocal<Long>();

/** @return Get the current call. */
@VisibleForTesting
public static ThreadLocal<Call> getCurCall() {
return CurCall;
}


public static ThreadLocal<Long> getCurCallStartNanos() {
return CurCallStartNanos;
}

/**
* Returns the currently active RPC call's sequential ID number. A negative
* call ID indicates an invalid value, such as if there is no currently active
Expand Down Expand Up @@ -638,7 +644,8 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped
rpcMetrics.addRpcQueueTime(queueTime);

if (call.isResponseDeferred() || connDropped) {
// call was skipped; don't include it in processing metrics
// The call was skipped; don't include it in processing metrics.
// Will update metrics in method updateDeferredMetrics.
return;
}

Expand Down Expand Up @@ -668,9 +675,41 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped
}
}

void updateDeferredMetrics(String name, long processingTime) {
/**
* Update rpc metrics for defered calls.
* @param call The Rpc Call
* @param name Rpc method name
* @param processingTime processing call in ms unit.
*/
void updateDeferredMetrics(Call call, String name, long processingTime) {
long completionTimeNanos = Time.monotonicNowNanos();
long arrivalTimeNanos = call.timestampNanos;

ProcessingDetails details = call.getProcessingDetails();
long waitTime =
details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
long responseTime =
details.get(Timing.RESPONSE, rpcMetrics.getMetricsTimeUnit());
rpcMetrics.addRpcLockWaitTime(waitTime);
rpcMetrics.addRpcProcessingTime(processingTime);
rpcMetrics.addRpcResponseTime(responseTime);
rpcMetrics.addDeferredRpcProcessingTime(processingTime);
rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime);
// don't include lock wait for detailed metrics.
processingTime -= waitTime;
rpcDetailedMetrics.addProcessingTime(name, processingTime);

// Overall processing time is from arrival to completion.
long overallProcessingTime = rpcMetrics.getMetricsTimeUnit()
.convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS);
rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime);
callQueue.addResponseTime(name, call, details);
if (isLogSlowRPC()) {
logSlowRpcCalls(name, call, details);
}
if (details.getReturnStatus() == RpcStatusProto.SUCCESS) {
rpcMetrics.incrRpcCallSuccesses();
}
}

/**
Expand Down Expand Up @@ -1243,6 +1282,8 @@ public Void run() throws Exception {
}

long startNanos = Time.monotonicNowNanos();
// TODO ZHB 这个用来统计processing耗时
CurCallStartNanos.set(startNanos);
Writable value = null;
ResponseParams responseParams = new ResponseParams();

Expand Down Expand Up @@ -1331,6 +1372,7 @@ void doResponse(Throwable t, RpcStatusProto status) throws IOException {
* Send a deferred response, ignoring errors.
*/
private void sendDeferedResponse() {
long startNanos = Time.monotonicNowNanos();
try {
connection.sendResponse(this);
} catch (Exception e) {
Expand All @@ -1342,6 +1384,8 @@ private void sendDeferedResponse() {
.currentThread().getName() + ", CallId="
+ callId + ", hostname=" + getHostAddress());
}
getProcessingDetails().set(Timing.RESPONSE,
Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS);
}

@Override
Expand Down Expand Up @@ -3220,6 +3264,7 @@ public void run() {
}
} finally {
CurCall.set(null);
CurCallStartNanos.set(null);
numInProcessHandler.decrementAndGet();
IOUtils.cleanupWithLogger(LOG, traceScope);
if (call != null) {
Expand Down

0 comments on commit 9bee480

Please sign in to comment.