Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #12313 - Jetty 12 ee9/ee10 doesn't invoke callbacks when h2 client sends RST_STREAM. #12370

Merged
merged 3 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ public void flush() throws IOException
private void checkWritable() throws EofException
{
if (_softClose)
throw new EofException("Closed");
throw new EofException("Closed");

switch (_state)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,20 +661,15 @@ else if (noStack != null)
LOG.warn(request == null ? "unknown request" : request.getServletApiRequest().getRequestURI(), failure);
}

if (isCommitted())
try
{
abort(failure);
boolean abort = _state.onError(failure);
if (abort)
abort(failure);
}
else
catch (Throwable x)
{
try
{
_state.onError(failure);
}
catch (IllegalStateException e)
{
abort(failure);
}
abort(failure);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,14 +874,15 @@ public boolean onIdleTimeout(TimeoutException timeout)
}
}

protected void onError(Throwable th)
protected boolean onError(Throwable th)
{
boolean committed = _servletChannel.isCommitted();
final AsyncContextEvent asyncEvent;
final List<AsyncListener> asyncListeners;
try (AutoLock ignored = lock())
{
if (LOG.isDebugEnabled())
LOG.debug("thrownException {}", getStatusStringLocked(), th);
LOG.debug("onError {}", getStatusStringLocked(), th);

// This can only be called from within the handle loop
if (_state != State.HANDLING)
Expand All @@ -890,34 +891,42 @@ protected void onError(Throwable th)
// If sendError has already been called, we can only handle one failure at a time!
if (_sendError)
{
LOG.warn("unhandled due to prior sendError", th);
return;
LOG.warn("onError not handled due to prior sendError() {}", getStatusStringLocked(), th);
return false;
}

// Check async state to determine type of handling
switch (_requestState)
{
case BLOCKING:
// handle the exception with a sendError
{
// Handle the exception with a sendError.
if (committed)
return true;
sendError(th);
return;

return false;
}
case DISPATCH: // Dispatch has already been called, but we ignore and handle exception below
case COMPLETE: // Complete has already been called, but we ignore and handle exception below
case ASYNC:
{
if (_asyncListeners == null || _asyncListeners.isEmpty())
{
if (committed)
return true;
sendError(th);
return;
return false;
}
asyncEvent = _event;
asyncEvent.addThrowable(th);
asyncListeners = _asyncListeners;
break;

}
default:
LOG.warn("unhandled in state {}", _requestState, new IllegalStateException(th));
return;
{
LOG.warn("onError not handled due to invalid requestState {}", getStatusStringLocked(), th);
return false;
}
}
}

Expand Down Expand Up @@ -948,7 +957,10 @@ protected void onError(Throwable th)
{
// The listeners did not invoke API methods and the
// container must provide a default error dispatch.
if (committed)
return true;
sendError(th);
return false;
}
else if (_requestState != RequestState.COMPLETE)
{
Expand All @@ -957,12 +969,14 @@ else if (_requestState != RequestState.COMPLETE)
else
LOG.warn("unhandled in state {}", _requestState, new IllegalStateException(th));
}
return committed;
}
}

private void sendError(Throwable th)
{
// No sync as this is always called with lock held
assert _lock.isHeldByCurrentThread();

// Determine the actual details of the exception
final Request request = _servletChannel.getServletContextRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ protected ContextRequest wrapRequest(Request request, Response response)

ServletContextRequest servletContextRequest = newServletContextRequest(servletChannel, request, response, decodedPathInContext, matchedResource);
servletChannel.associate(servletContextRequest);
Request.addCompletionListener(request, servletChannel::recycle);
Request.addCompletionListener(servletContextRequest, servletChannel::recycle);
return servletContextRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.jetty.ee10.test.client.transport;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -21,12 +22,14 @@
import jakarta.servlet.AsyncContext;
import jakarta.servlet.AsyncEvent;
import jakarta.servlet.AsyncListener;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
Expand Down Expand Up @@ -54,11 +57,12 @@

public class Http2AsyncIOServletTest
{
private final HttpConfiguration httpConfig = new HttpConfiguration();
private Server server;
private ServerConnector connector;
private HTTP2Client client;

private void start(HttpConfiguration httpConfig, HttpServlet httpServlet) throws Exception
private void start(HttpServlet httpServlet) throws Exception
{
server = new Server();
connector = new ServerConnector(server, 1, 1, new HTTP2CServerConnectionFactory(httpConfig));
Expand All @@ -83,12 +87,10 @@ public void tearDown()
@ValueSource(booleans = {true, false})
public void testStartAsyncThenClientResetRemoteErrorNotification(boolean notify) throws Exception
{
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setNotifyRemoteAsyncErrors(notify);

AtomicReference<AsyncEvent> errorAsyncEventRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
start(httpConfig, new HttpServlet()
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
Expand Down Expand Up @@ -138,14 +140,82 @@ public void onStartAsync(AsyncEvent event)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code));

if (notify)
{
// Wait for the reset to be notified to the async context listener.
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
AsyncEvent asyncEvent = errorAsyncEventRef.get();
return asyncEvent == null ? null : asyncEvent.getThrowable();
}, instanceOf(EofException.class));
}
else
{
// Wait for the reset to NOT be notified to the failure listener.
await().atMost(5, TimeUnit.SECONDS).during(1, TimeUnit.SECONDS).until(errorAsyncEventRef::get, nullValue());
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testClientResetNotifiesAsyncListener(boolean commitResponse) throws Exception
{
CountDownLatch requestLatch = new CountDownLatch(1);
CountDownLatch errorLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
if (commitResponse)
response.flushBuffer();

AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);

asyncContext.addListener(new AsyncListener()
{
@Override
public void onComplete(AsyncEvent event)
{
}

@Override
public void onTimeout(AsyncEvent event)
{
}

@Override
public void onError(AsyncEvent event)
{
if (!response.isCommitted())
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
asyncContext.complete();
errorLatch.countDown();
}

@Override
public void onStartAsync(AsyncEvent event)
{
}
});

requestLatch.countDown();
}
});

Session session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
.get(5, TimeUnit.SECONDS);
MetaData.Request request = new MetaData.Request("GET", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY);
Stream stream = session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);

// Wait for the server to become idle after the request.
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
Thread.sleep(500);

// Reset the stream.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code));

assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -56,17 +57,18 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

@ExtendWith(WorkDirExtension.class)
public class Http3AsyncIOServletTest
{
public WorkDir workDir;

private final HttpConfiguration httpConfig = new HttpConfiguration();
private Server server;
private QuicServerConnector connector;
private HTTP3Client client;

private void start(HttpConfiguration httpConfig, HttpServlet httpServlet) throws Exception
private void start(HttpServlet httpServlet) throws Exception
{
server = new Server();
SslContextFactory.Server serverSslContextFactory = new SslContextFactory.Server();
Expand Down Expand Up @@ -95,12 +97,10 @@ public void tearDown()
@ValueSource(booleans = {true, false})
public void testStartAsyncThenClientResetRemoteErrorNotification(boolean notify) throws Exception
{
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setNotifyRemoteAsyncErrors(notify);

AtomicReference<AsyncEvent> errorAsyncEventRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
start(httpConfig, new HttpServlet()
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
Expand Down Expand Up @@ -158,4 +158,14 @@ public void onStartAsync(AsyncEvent event)
// Wait for the reset to NOT be notified to the failure listener.
await().atMost(5, TimeUnit.SECONDS).during(1, TimeUnit.SECONDS).until(errorAsyncEventRef::get, nullValue());
}

@Test
public void testClientResetNotifiesAsyncListener()
{
// See the equivalent test in Http2AsyncIOServletTest for HTTP/2.
// For HTTP/3 we do not have a "reset" event that we can relay to applications,
// because HTTP/3 does not have a "reset" frame; QUIC has RESET_STREAM, but we
// do not have an event from Quiche to reliably report it to applications.
assumeTrue(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,6 @@ private HttpInput.Content intercept()
// do not try to produce new raw content to get a fresher error
// when the special content was generated by the interceptor.
_error = true;
if (_httpChannel.getResponse().isCommitted())
_httpChannel.abort(error);
}
if (LOG.isDebugEnabled())
LOG.debug("interceptor generated special content {}", this);
Expand All @@ -446,9 +444,6 @@ private HttpInput.Content intercept()
// do not try to produce new raw content to get a fresher error
// when the special content was caused by the interceptor throwing.
_error = true;
Response response = _httpChannel.getResponse();
if (response.isCommitted())
_httpChannel.abort(failure);
if (LOG.isDebugEnabled())
LOG.debug("interceptor threw exception {}", this, x);
return _transformedContent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import jakarta.servlet.http.HttpSessionListener;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
Expand Down Expand Up @@ -2809,6 +2810,15 @@ else if (httpChannel.getContextHandler() == ContextHandler.this && !request.getC

CoreContextRequest coreContextRequest = new CoreContextRequest(request, this.getContext(), httpChannel);
httpChannel.onRequest(coreContextRequest);
HttpChannel channel = httpChannel;
org.eclipse.jetty.server.Request.addCompletionListener(coreContextRequest, x ->
{
// WebSocket needs a reference to the HttpServletRequest,
// so do not recycle the HttpChannel if it's a WebSocket
// request, no matter if the response is successful or not.
if (!request.getHeaders().contains(HttpHeader.SEC_WEBSOCKET_VERSION))
channel.recycle();
});
return coreContextRequest;
}

Expand Down
Loading
Loading