Skip to content

Commit

Permalink
Merge pull request wso2#2135 from malakaganga/warn_log_fix_x
Browse files Browse the repository at this point in the history
Fix SO timeout log issue when the worker pool is exhausted.
  • Loading branch information
malakaganga authored Jun 23, 2023
2 parents 1c0eba0 + 599fd45 commit b11ce26
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,14 @@ public class ClientWorker implements Runnable {
/** the axis2 message context of the request */
private MessageContext requestMessageContext;

private Long queuedTime = null;

private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

public ClientWorker(TargetConfiguration targetConfiguration,
MessageContext outMsgCtx,
TargetResponse response) {
this.queuedTime = System.currentTimeMillis();
this.targetConfiguration = targetConfiguration;
this.response = response;
this.expectEntityBody = response.isExpectResponseBody();
Expand Down Expand Up @@ -226,12 +229,10 @@ public void run() {
// Mark the start of the request at the beginning of the worker thread
response.getConnection().getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_THREAD_STATUS,
PassThroughConstants.THREAD_STATUS_RUNNING);
Object queuedTime =
response.getConnection().getContext().getAttribute(PassThroughConstants.CLIENT_WORKER_SIDE_QUEUED_TIME);

Long expectedMaxQueueingTime = conf.getExpectedMaxQueueingTime();
if (queuedTime != null && expectedMaxQueueingTime != null) {
Long clientWorkerQueuedTime = System.currentTimeMillis() - (Long) queuedTime;
Long clientWorkerQueuedTime = System.currentTimeMillis() - queuedTime;
if (clientWorkerQueuedTime >= expectedMaxQueueingTime) {
log.warn("Client worker thread queued time exceeds the expected max queueing time. Expected max "
+ "queueing time : " + expectedMaxQueueingTime + "ms. Actual queued time : "
Expand All @@ -245,6 +246,16 @@ public void run() {
((NHttpServerConnection) responseMsgCtx.getProperty(PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION)).
getContext().setAttribute(PassThroughConstants.CLIENT_WORKER_START_TIME, System.currentTimeMillis());
}

if (response.isForceShutdownConnectionOnComplete() && !conf.isConsumeAndDiscardBySecondaryWorkerPool()
&& conf.isConsumeAndDiscard()) {
// If an error has happened in the request processing, consumes the data in pipe completely and discard it
try {
RelayUtils.discardRequestMessage(requestMessageContext);
} catch (AxisFault af) {
log.error("Fault discarding request message", af);
}
}
try {
if (expectEntityBody) {
String cType = response.getHeader(HTTP.CONTENT_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.commons.logging.LogFactory;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.synapse.commons.CorrelationConstants;
import org.apache.synapse.transport.passthru.config.PassThroughConfiguration;
import org.apache.synapse.transport.passthru.config.TargetConfiguration;
import org.apache.synapse.transport.passthru.util.PassThroughTransportUtils;
import org.apache.synapse.transport.passthru.util.RelayUtils;

public class MessageDiscardWorker implements Runnable {

private Log log = LogFactory.getLog(MessageDiscardWorker.class);
private ClientWorker clientWorker = null;

TargetConfiguration targetConfiguration = null;

Expand All @@ -38,16 +40,36 @@ public class MessageDiscardWorker implements Runnable {

NHttpClientConnection conn = null;

private Long queuedTime = null;

private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

public MessageDiscardWorker(MessageContext requestMsgContext, TargetResponse response,
TargetConfiguration targetConfiguration, ClientWorker clientWorker, NHttpClientConnection conn) {
TargetConfiguration targetConfiguration, NHttpClientConnection conn) {
this.response = response;
this.requestMessageContext = requestMsgContext;
this.targetConfiguration = targetConfiguration;
this.clientWorker = clientWorker;
this.conn = conn;
this.queuedTime = System.currentTimeMillis();
}

public void run() {
NHttpServerConnection sourceConn = (NHttpServerConnection) requestMessageContext.getProperty(
PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION);

// Mark the start of the request message discard worker at the beginning of the worker thread
setThreadState(sourceConn, PassThroughConstants.THREAD_STATUS_RUNNING);
Long expectedMaxQueueingTime = conf.getExpectedMaxQueueingTimeForMessageDiscardWorker();
if (queuedTime != null && expectedMaxQueueingTime != null) {
Long messageDiscardWorkerQueuedTime = System.currentTimeMillis() - queuedTime;
if (messageDiscardWorkerQueuedTime >= expectedMaxQueueingTime) {
log.warn("Message discard worker queued time exceeds the expected max queueing time. Expected "
+ "max queueing time : " + expectedMaxQueueingTime + "ms. Actual queued time : "
+ messageDiscardWorkerQueuedTime + "ms"+ ", CORRELATION_ID : "
+ requestMessageContext.getProperty(CorrelationConstants.CORRELATION_ID));
}

}

// If an error has happened in the request processing, consumes the data in pipe completely and discard it
try {
Expand All @@ -56,42 +78,24 @@ public void run() {
log.error("Fault discarding request message", af);
}

targetConfiguration.getWorkerPool().execute(clientWorker);
// Mark the end of the request message discard worker at the end of the worker thread
setThreadState(sourceConn, PassThroughConstants.THREAD_STATUS_FINISHED);
targetConfiguration.getWorkerPool().execute(new ClientWorker(targetConfiguration, requestMessageContext, response));

targetConfiguration.getMetrics().incrementMessagesReceived();

NHttpServerConnection sourceConn = (NHttpServerConnection) requestMessageContext.getProperty(
PassThroughConstants.PASS_THROUGH_SOURCE_CONNECTION);
if (sourceConn != null) {
sourceConn.getContext().setAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.RES_HEADER_ARRIVAL_TIME);

sourceConn.getContext().setAttribute(PassThroughConstants.REQ_DEPARTURE_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.REQ_DEPARTURE_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.REQ_DEPARTURE_TIME);
sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME)
);

conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_START_TIME);
sourceConn.getContext().setAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.REQ_TO_BACKEND_WRITE_END_TIME);
sourceConn.getContext().setAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME,
conn.getContext()
.getAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME)
);
conn.getContext().removeAttribute(PassThroughConstants.RES_FROM_BACKEND_READ_START_TIME);

PassThroughTransportUtils.setSourceConnectionContextAttributes(sourceConn, conn);
}

}

private void setThreadState(NHttpServerConnection sourceConn, String threadState) {
conn.getContext().setAttribute(PassThroughConstants.MESSAGE_DISCARD_WORKER_THREAD_STATUS,
threadState);
if (sourceConn != null) {
sourceConn.getContext().setAttribute(PassThroughConstants.MESSAGE_DISCARD_WORKER_THREAD_STATUS,
threadState);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,14 @@ public class PassThroughConstants {

public static final String CLIENT_WORKER_THREAD_STATUS = "CLIENT_WORKER_THREAD_STATUS";

public static final String SERVER_WORKER_SIDE_QUEUED_TIME = "SERVER_WORKER_SIDE_QUEUED_TIME";

public static final String CLIENT_WORKER_SIDE_QUEUED_TIME = "CLIENT_WORKER_SIDE_QUEUED_TIME";
public static final String MESSAGE_DISCARD_WORKER_THREAD_STATUS = "MESSAGE_DISCARD_WORKER_THREAD_STATUS";

public static final String THREAD_STATUS_RUNNING = "RUNNING";

public static final String THREAD_STATUS_MARKED = "MARKED";

public static final String THREAD_STATUS_FINISHED = "FINISHED";

public static final String SERVER_WORKER_START_TIME = "SERVER_WORKER_START_TIME";

public static final String CLIENT_WORKER_INIT_TIME = "CLIENT_WORKER_INIT_TIME";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public class ServerWorker implements Runnable {
private PassThroughConfiguration conf = PassThroughConfiguration.getInstance();

private OutputStream os; //only used for WSDL requests..

private Long queuedTime = null;

public ServerWorker(final SourceRequest request,
final SourceConfiguration sourceConfiguration,final OutputStream os) {
Expand All @@ -130,6 +132,7 @@ public ServerWorker(final SourceRequest request,
request.getConnection().getContext().getAttribute(SynapseDebugInfoHolder.SYNAPSE_WIRE_LOG_HOLDER_PROPERTY));
request.getConnection().getContext().setAttribute(NhttpConstants.SERVER_WORKER_INIT_TIME,
System.currentTimeMillis());
queuedTime = System.currentTimeMillis();
}

public ServerWorker(final SourceRequest request,
Expand All @@ -146,12 +149,10 @@ public void run() {
// Mark the start of the request at the beginning of the worker thread
request.getConnection().getContext().setAttribute(PassThroughConstants.SERVER_WORKER_THREAD_STATUS,
PassThroughConstants.THREAD_STATUS_RUNNING);
Object queuedTime =
request.getConnection().getContext().getAttribute(PassThroughConstants.SERVER_WORKER_SIDE_QUEUED_TIME);

Long expectedMaxQueueingTime = conf.getExpectedMaxQueueingTime();
if (queuedTime != null && expectedMaxQueueingTime != null) {
Long serverWorkerQueuedTime = System.currentTimeMillis() - (Long) queuedTime;
Long serverWorkerQueuedTime = System.currentTimeMillis() - queuedTime;
if (serverWorkerQueuedTime >= expectedMaxQueueingTime) {
log.warn("Server worker thread queued time exceeds the expected max queueing time. Expected max " +
"queueing time : " + expectedMaxQueueingTime + "ms. Actual queued time : " +
Expand Down
Loading

0 comments on commit b11ce26

Please sign in to comment.