Skip to content

Commit

Permalink
fix: Fix PayloadProcessor response payload race condition (#120)
Browse files Browse the repository at this point in the history
Prevent incorrectly empty logged payloads:
- Do not attempt to avoid slicing the response bytebuf in the case that a
  PayloadProcessor is configured
- Do not attempt to avoid some additional refcount updates in the case
  status != OK

Resolves #111

-----

Signed-off-by: Nick Hill <[email protected]>
  • Loading branch information
njhill authored Nov 24, 2023
1 parent a997686 commit 582021e
Showing 1 changed file with 16 additions and 19 deletions.
35 changes: 16 additions & 19 deletions src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ public void onHalfClose() {
String vModelId = null;
String requestId = null;
ModelResponse response = null;
ByteBuf responsePayload = null;
try (InterruptingListener cancelListener = newInterruptingListener()) {
if (logHeaders != null) {
logHeaders.addToMDC(headers); // MDC cleared in finally block
Expand Down Expand Up @@ -767,18 +768,20 @@ public void onHalfClose() {
} finally {
if (payloadProcessor != null) {
processPayload(reqMessage.readerIndex(reqReaderIndex),
requestId, resolvedModelId, methodName, headers, null, true);
requestId, resolvedModelId, methodName, headers, null);
} else {
releaseReqMessage();
}
reqMessage = null; // ownership released or transferred
}

respReaderIndex = response.data.readerIndex();
respSize = response.data.readableBytes();
call.sendHeaders(response.metadata);
if (payloadProcessor != null) {
responsePayload = response.data.retainedSlice();
}
call.sendMessage(response.data);
// response is released via ReleaseAfterResponse.releaseAll()
// final response refcount is released via ReleaseAfterResponse.releaseAll()
status = OK;
} catch (Exception e) {
status = toStatus(e);
Expand All @@ -795,17 +798,13 @@ public void onHalfClose() {
evictMethodDescriptor(methodName);
}
} finally {
final boolean releaseResponse = status != OK;
if (payloadProcessor != null) {
ByteBuf data = null;
Metadata metadata = null;
if (response != null) {
data = response.data.readerIndex(respReaderIndex);
metadata = response.metadata;
}
processPayload(data, requestId, resolvedModelId, methodName, metadata, status, releaseResponse);
} else if (releaseResponse && response != null) {
response.release();
Metadata metadata = response != null ? response.metadata : null;
processPayload(responsePayload, requestId, resolvedModelId, methodName, metadata, status);
}
if (status != OK && response != null) {
// An additional release is required if we call.sendMessage() wasn't sucessful
response.data.release();
}
ReleaseAfterResponse.releaseAll();
clearThreadLocals();
Expand All @@ -820,23 +819,21 @@ public void onHalfClose() {
}

/**
* Invoke PayloadProcessor on the request/response data
* Invoke PayloadProcessor on the request/response data. This method takes ownership
* of the passed-in {@code ByteBuf}.
*
* @param data the binary data
* @param payloadId the id of the request
* @param modelId the id of the model
* @param methodName the name of the invoked method
* @param metadata the method name metadata
* @param status null for requests, non-null for responses
* @param takeOwnership whether the processor should take ownership
*/
private void processPayload(ByteBuf data, String payloadId, String modelId, String methodName,
Metadata metadata, io.grpc.Status status, boolean takeOwnership) {
Metadata metadata, io.grpc.Status status) {
Payload payload = null;
try {
assert payloadProcessor != null;
if (!takeOwnership) {
ReferenceCountUtil.retain(data);
}
payload = new Payload(payloadId, modelId, methodName, metadata, data, status);
if (payloadProcessor.process(payload)) {
data = null; // ownership transferred
Expand Down

0 comments on commit 582021e

Please sign in to comment.