From 582021e45d9441a907e4470029b8b559ce65f3ab Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Fri, 24 Nov 2023 13:14:09 -0800 Subject: [PATCH] fix: Fix PayloadProcessor response payload race condition (#120) Prevent incorrectly empty logged payloads: - Do not attempt to avoid slicing the response bytebuf in the case that a PayloadProcessor is configured - Do not attempt to avoid some additional refcount updates in the case status != OK Resolves #111 ----- Signed-off-by: Nick Hill --- .../ibm/watson/modelmesh/ModelMeshApi.java | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java index 715c0efe..ad91ac11 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java @@ -725,6 +725,7 @@ public void onHalfClose() { String vModelId = null; String requestId = null; ModelResponse response = null; + ByteBuf responsePayload = null; try (InterruptingListener cancelListener = newInterruptingListener()) { if (logHeaders != null) { logHeaders.addToMDC(headers); // MDC cleared in finally block @@ -767,18 +768,20 @@ public void onHalfClose() { } finally { if (payloadProcessor != null) { processPayload(reqMessage.readerIndex(reqReaderIndex), - requestId, resolvedModelId, methodName, headers, null, true); + requestId, resolvedModelId, methodName, headers, null); } else { releaseReqMessage(); } reqMessage = null; // ownership released or transferred } - respReaderIndex = response.data.readerIndex(); respSize = response.data.readableBytes(); call.sendHeaders(response.metadata); + if (payloadProcessor != null) { + responsePayload = response.data.retainedSlice(); + } call.sendMessage(response.data); - // response is released via ReleaseAfterResponse.releaseAll() + // final response refcount is released via ReleaseAfterResponse.releaseAll() status = OK; } catch (Exception e) { status = toStatus(e); @@ -795,17 +798,13 @@ public void onHalfClose() { evictMethodDescriptor(methodName); } } finally { - final boolean releaseResponse = status != OK; if (payloadProcessor != null) { - ByteBuf data = null; - Metadata metadata = null; - if (response != null) { - data = response.data.readerIndex(respReaderIndex); - metadata = response.metadata; - } - processPayload(data, requestId, resolvedModelId, methodName, metadata, status, releaseResponse); - } else if (releaseResponse && response != null) { - response.release(); + Metadata metadata = response != null ? response.metadata : null; + processPayload(responsePayload, requestId, resolvedModelId, methodName, metadata, status); + } + if (status != OK && response != null) { + // An additional release is required if we call.sendMessage() wasn't sucessful + response.data.release(); } ReleaseAfterResponse.releaseAll(); clearThreadLocals(); @@ -820,23 +819,21 @@ public void onHalfClose() { } /** - * Invoke PayloadProcessor on the request/response data + * Invoke PayloadProcessor on the request/response data. This method takes ownership + * of the passed-in {@code ByteBuf}. + * * @param data the binary data * @param payloadId the id of the request * @param modelId the id of the model * @param methodName the name of the invoked method * @param metadata the method name metadata * @param status null for requests, non-null for responses - * @param takeOwnership whether the processor should take ownership */ private void processPayload(ByteBuf data, String payloadId, String modelId, String methodName, - Metadata metadata, io.grpc.Status status, boolean takeOwnership) { + Metadata metadata, io.grpc.Status status) { Payload payload = null; try { assert payloadProcessor != null; - if (!takeOwnership) { - ReferenceCountUtil.retain(data); - } payload = new Payload(payloadId, modelId, methodName, metadata, data, status); if (payloadProcessor.process(payload)) { data = null; // ownership transferred