-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19362. RPC metrics should be updated correctly when call is defered. #7224
base: trunk
Are you sure you want to change the base?
Changes from 6 commits
e201bfc
9170fad
eaef2a1
238f2dd
d425138
4900b0d
f8b9f0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -351,13 +351,19 @@ public static Server get() { | |
* after the call returns. | ||
*/ | ||
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>(); | ||
|
||
|
||
private static final ThreadLocal<Long> CUR_CALL_STARTNANOS = new ThreadLocal<Long>(); | ||
|
||
/** @return Get the current call. */ | ||
@VisibleForTesting | ||
public static ThreadLocal<Call> getCurCall() { | ||
return CurCall; | ||
} | ||
|
||
|
||
public static ThreadLocal<Long> getCurCallStartnanos() { | ||
return CUR_CALL_STARTNANOS; | ||
} | ||
|
||
/** | ||
* Returns the currently active RPC call's sequential ID number. A negative | ||
* call ID indicates an invalid value, such as if there is no currently active | ||
|
@@ -638,7 +644,8 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped | |
rpcMetrics.addRpcQueueTime(queueTime); | ||
|
||
if (call.isResponseDeferred() || connDropped) { | ||
// call was skipped; don't include it in processing metrics | ||
// The call was skipped; don't include it in processing metrics. | ||
// Will update metrics in method updateDeferredMetrics. | ||
return; | ||
} | ||
|
||
|
@@ -668,9 +675,41 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped | |
} | ||
} | ||
|
||
void updateDeferredMetrics(String name, long processingTime) { | ||
/** | ||
* Update rpc metrics for defered calls. | ||
* @param call The Rpc Call | ||
* @param name Rpc method name | ||
* @param processingTime processing call in ms unit. | ||
*/ | ||
void updateDeferredMetrics(Call call, String name, long processingTime) { | ||
long completionTimeNanos = Time.monotonicNowNanos(); | ||
long arrivalTimeNanos = call.timestampNanos; | ||
|
||
ProcessingDetails details = call.getProcessingDetails(); | ||
long waitTime = | ||
details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit()); | ||
long responseTime = | ||
details.get(Timing.RESPONSE, rpcMetrics.getMetricsTimeUnit()); | ||
rpcMetrics.addRpcLockWaitTime(waitTime); | ||
rpcMetrics.addRpcProcessingTime(processingTime); | ||
rpcMetrics.addRpcResponseTime(responseTime); | ||
rpcMetrics.addDeferredRpcProcessingTime(processingTime); | ||
rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime); | ||
// don't include lock wait for detailed metrics. | ||
processingTime -= waitTime; | ||
rpcDetailedMetrics.addProcessingTime(name, processingTime); | ||
|
||
// Overall processing time is from arrival to completion. | ||
long overallProcessingTime = rpcMetrics.getMetricsTimeUnit() | ||
.convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS); | ||
rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime); | ||
callQueue.addResponseTime(name, call, details); | ||
if (isLogSlowRPC()) { | ||
logSlowRpcCalls(name, call, details); | ||
} | ||
if (details.getReturnStatus() == RpcStatusProto.SUCCESS) { | ||
rpcMetrics.incrRpcCallSuccesses(); | ||
} | ||
Comment on lines
+677
to
+706
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is some duplicate code in this method and updateMetrics; we can extract the common code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @KeeProMise Sir, thanks for your reviewing. This is a good suggestion, but i am worrying about the code readability will not be good. What's your opinion? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hfutatzhanghb got it. |
||
} | ||
|
||
/** | ||
|
@@ -963,6 +1002,7 @@ public static class Call implements Schedulable, | |
final int callId; // the client's call id | ||
final int retryCount; // the retry count of the call | ||
private final long timestampNanos; // time the call was received | ||
protected long startHandleTimestampNanos; // time the call was run | ||
long responseTimestampNanos; // time the call was served | ||
private AtomicInteger responseWaitCount = new AtomicInteger(1); | ||
final RPC.RpcKind rpcKind; | ||
|
@@ -1167,6 +1207,15 @@ public void setDeferredError(Throwable t) { | |
public long getTimestampNanos() { | ||
return timestampNanos; | ||
} | ||
|
||
|
||
public long getStartHandleTimestampNanos() { | ||
return startHandleTimestampNanos; | ||
} | ||
|
||
public void setStartHandleTimestampNanos(long startHandleTimestampNanos) { | ||
this.startHandleTimestampNanos = startHandleTimestampNanos; | ||
} | ||
} | ||
|
||
/** A RPC extended call queued for handling. */ | ||
|
@@ -1243,6 +1292,7 @@ public Void run() throws Exception { | |
} | ||
|
||
long startNanos = Time.monotonicNowNanos(); | ||
this.setStartHandleTimestampNanos(startNanos); | ||
Writable value = null; | ||
ResponseParams responseParams = new ResponseParams(); | ||
|
||
|
@@ -1331,6 +1381,7 @@ void doResponse(Throwable t, RpcStatusProto status) throws IOException { | |
* Send a deferred response, ignoring errors. | ||
*/ | ||
private void sendDeferedResponse() { | ||
long startNanos = Time.monotonicNowNanos(); | ||
try { | ||
connection.sendResponse(this); | ||
} catch (Exception e) { | ||
|
@@ -1342,6 +1393,8 @@ private void sendDeferedResponse() { | |
.currentThread().getName() + ", CallId=" | ||
+ callId + ", hostname=" + getHostAddress()); | ||
} | ||
getProcessingDetails().set(Timing.RESPONSE, | ||
Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS); | ||
} | ||
|
||
@Override | ||
|
@@ -3220,6 +3273,7 @@ public void run() { | |
} | ||
} finally { | ||
CurCall.set(null); | ||
CUR_CALL_STARTNANOS.set(null); | ||
numInProcessHandler.decrementAndGet(); | ||
IOUtils.cleanupWithLogger(LOG, traceScope); | ||
if (call != null) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CUR_CALL_STARTNANOS Can it be deleted directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Have fixed~ Thanks a lot.