Skip to content

Commit

Permalink
Fixes #12313 - Jetty 12 ee9/ee10 doesn't invoke callbacks when h2 cli…
Browse files Browse the repository at this point in the history
…ent sends RST_STREAM.

* Fixed invocation of AsyncListener.onError(), now called even if the response is already committed, in both EE9 and EE10.
* Reworked EE9 HttpChannel state machine in case of failures to be like EE10's.
  In particular, calling abort now is a state change, rather than a failure of the Handler callback.
  In this way, the handle() loop continues, enters case TERMINATED, and the callback is completed in onCompleted().
* Fixed EE9 handling of idle timeout in HttpChannel.onRequest(), that was missing.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Oct 10, 2024
1 parent 7b22104 commit e75b294
Show file tree
Hide file tree
Showing 12 changed files with 363 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ public void flush() throws IOException
private void checkWritable() throws EofException
{
if (_softClose)
throw new EofException("Closed");
throw new EofException("Closed");

switch (_state)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,20 +661,15 @@ else if (noStack != null)
LOG.warn(request == null ? "unknown request" : request.getServletApiRequest().getRequestURI(), failure);
}

if (isCommitted())
try
{
abort(failure);
boolean abort = _state.onError(failure);
if (abort)
abort(failure);
}
else
catch (Throwable x)
{
try
{
_state.onError(failure);
}
catch (IllegalStateException e)
{
abort(failure);
}
abort(failure);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,14 +874,15 @@ public boolean onIdleTimeout(TimeoutException timeout)
}
}

protected void onError(Throwable th)
protected boolean onError(Throwable th)
{
boolean committed = _servletChannel.isCommitted();
final AsyncContextEvent asyncEvent;
final List<AsyncListener> asyncListeners;
try (AutoLock ignored = lock())
{
if (LOG.isDebugEnabled())
LOG.debug("thrownException {}", getStatusStringLocked(), th);
LOG.debug("onError {}", getStatusStringLocked(), th);

// This can only be called from within the handle loop
if (_state != State.HANDLING)
Expand All @@ -890,34 +891,42 @@ protected void onError(Throwable th)
// If sendError has already been called, we can only handle one failure at a time!
if (_sendError)
{
LOG.warn("unhandled due to prior sendError", th);
return;
LOG.warn("onError not handled due to prior sendError() {}", getStatusStringLocked(), th);
return false;
}

// Check async state to determine type of handling
switch (_requestState)
{
case BLOCKING:
// handle the exception with a sendError
{
// Handle the exception with a sendError.
if (committed)
return true;
sendError(th);
return;

return false;
}
case DISPATCH: // Dispatch has already been called, but we ignore and handle exception below
case COMPLETE: // Complete has already been called, but we ignore and handle exception below
case ASYNC:
{
if (_asyncListeners == null || _asyncListeners.isEmpty())
{
if (committed)
return true;
sendError(th);
return;
return false;
}
asyncEvent = _event;
asyncEvent.addThrowable(th);
asyncListeners = _asyncListeners;
break;

}
default:
LOG.warn("unhandled in state {}", _requestState, new IllegalStateException(th));
return;
{
LOG.warn("onError not handled due to invalid requestState {}", getStatusStringLocked(), th);
return false;
}
}
}

Expand Down Expand Up @@ -948,7 +957,10 @@ protected void onError(Throwable th)
{
// The listeners did not invoke API methods and the
// container must provide a default error dispatch.
if (committed)
return true;
sendError(th);
return false;
}
else if (_requestState != RequestState.COMPLETE)
{
Expand All @@ -957,12 +969,14 @@ else if (_requestState != RequestState.COMPLETE)
else
LOG.warn("unhandled in state {}", _requestState, new IllegalStateException(th));
}
return committed;
}
}

private void sendError(Throwable th)
{
// No sync as this is always called with lock held
assert _lock.isHeldByCurrentThread();

// Determine the actual details of the exception
final Request request = _servletChannel.getServletContextRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ protected ContextRequest wrapRequest(Request request, Response response)

ServletContextRequest servletContextRequest = newServletContextRequest(servletChannel, request, response, decodedPathInContext, matchedResource);
servletChannel.associate(servletContextRequest);
Request.addCompletionListener(request, servletChannel::recycle);
Request.addCompletionListener(servletContextRequest, servletChannel::recycle);
return servletContextRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.jetty.ee10.test.client.transport;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -21,12 +22,14 @@
import jakarta.servlet.AsyncContext;
import jakarta.servlet.AsyncEvent;
import jakarta.servlet.AsyncListener;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
Expand Down Expand Up @@ -54,11 +57,12 @@

public class Http2AsyncIOServletTest
{
private final HttpConfiguration httpConfig = new HttpConfiguration();
private Server server;
private ServerConnector connector;
private HTTP2Client client;

private void start(HttpConfiguration httpConfig, HttpServlet httpServlet) throws Exception
private void start(HttpServlet httpServlet) throws Exception
{
server = new Server();
connector = new ServerConnector(server, 1, 1, new HTTP2CServerConnectionFactory(httpConfig));
Expand All @@ -83,12 +87,10 @@ public void tearDown()
@ValueSource(booleans = {true, false})
public void testStartAsyncThenClientResetRemoteErrorNotification(boolean notify) throws Exception
{
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setNotifyRemoteAsyncErrors(notify);

AtomicReference<AsyncEvent> errorAsyncEventRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
start(httpConfig, new HttpServlet()
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
Expand Down Expand Up @@ -138,14 +140,82 @@ public void onStartAsync(AsyncEvent event)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code));

if (notify)
{
// Wait for the reset to be notified to the async context listener.
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
AsyncEvent asyncEvent = errorAsyncEventRef.get();
return asyncEvent == null ? null : asyncEvent.getThrowable();
}, instanceOf(EofException.class));
}
else
{
// Wait for the reset to NOT be notified to the failure listener.
await().atMost(5, TimeUnit.SECONDS).during(1, TimeUnit.SECONDS).until(errorAsyncEventRef::get, nullValue());
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testClientResetNotifiesAsyncListener(boolean commitResponse) throws Exception
{
CountDownLatch requestLatch = new CountDownLatch(1);
CountDownLatch errorLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
if (commitResponse)
response.flushBuffer();

AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);

asyncContext.addListener(new AsyncListener()
{
@Override
public void onComplete(AsyncEvent event)
{
}

@Override
public void onTimeout(AsyncEvent event)
{
}

@Override
public void onError(AsyncEvent event)
{
if (!response.isCommitted())
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
asyncContext.complete();
errorLatch.countDown();
}

@Override
public void onStartAsync(AsyncEvent event)
{
}
});

requestLatch.countDown();
}
});

Session session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
.get(5, TimeUnit.SECONDS);
MetaData.Request request = new MetaData.Request("GET", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY);
Stream stream = session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);

// Wait for the server to become idle after the request.
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
Thread.sleep(500);

// Reset the stream.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code));

assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -56,17 +57,18 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

@ExtendWith(WorkDirExtension.class)
public class Http3AsyncIOServletTest
{
public WorkDir workDir;

private final HttpConfiguration httpConfig = new HttpConfiguration();
private Server server;
private QuicServerConnector connector;
private HTTP3Client client;

private void start(HttpConfiguration httpConfig, HttpServlet httpServlet) throws Exception
private void start(HttpServlet httpServlet) throws Exception
{
server = new Server();
SslContextFactory.Server serverSslContextFactory = new SslContextFactory.Server();
Expand Down Expand Up @@ -95,12 +97,10 @@ public void tearDown()
@ValueSource(booleans = {true, false})
public void testStartAsyncThenClientResetRemoteErrorNotification(boolean notify) throws Exception
{
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setNotifyRemoteAsyncErrors(notify);

AtomicReference<AsyncEvent> errorAsyncEventRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
start(httpConfig, new HttpServlet()
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
Expand Down Expand Up @@ -158,4 +158,14 @@ public void onStartAsync(AsyncEvent event)
// Wait for the reset to NOT be notified to the failure listener.
await().atMost(5, TimeUnit.SECONDS).during(1, TimeUnit.SECONDS).until(errorAsyncEventRef::get, nullValue());
}

@Test
public void testClientResetNotifiesAsyncListener()
{
// See the equivalent test in Http2AsyncIOServletTest for HTTP/2.
// For HTTP/3 we do not have a "reset" event that we can relay to applications,
// because HTTP/3 does not have a "reset" frame; QUIC has RESET_STREAM, but we
// do not have an event from Quiche to reliably report it to applications.
assumeTrue(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2809,6 +2809,8 @@ else if (httpChannel.getContextHandler() == ContextHandler.this && !request.getC

CoreContextRequest coreContextRequest = new CoreContextRequest(request, this.getContext(), httpChannel);
httpChannel.onRequest(coreContextRequest);
HttpChannel channel = httpChannel;
org.eclipse.jetty.server.Request.addCompletionListener(coreContextRequest, x -> channel.recycle());
return coreContextRequest;
}

Expand Down
Loading

0 comments on commit e75b294

Please sign in to comment.