diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java index aa550df9c57..9a78993bdc4 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java @@ -407,25 +407,18 @@ public void onFailure(int request, Throwable failure) @Override public void close() { - if (stream != null) + try { - Runnable task = stream.getHttpChannel().onClose(); - if (task != null) + if (stream != null) { - ThreadPool.executeImmediately(getExecutor(), () -> - { - try - { - task.run(); - } - finally - { - super.close(); - } - }); - return; + Runnable task = stream.getHttpChannel().onClose(); + if (task != null) + task.run(); } } - super.close(); + finally + { + super.close(); + } } } diff --git a/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java b/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java index fb1ef25d20e..7a4ed06df21 100644 --- a/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java +++ b/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java @@ -180,19 +180,31 @@ public void onStreamTimeout(Stream stream, TimeoutException timeout, Promise { - channel.onTimeout(timeout, (task, timedOut) -> + if (task == null) { - if (task != null) - offerTask(task, true); promise.succeeded(timedOut); + return; + } + ThreadPool.executeImmediately(getExecutor(), () -> + { + try + { + task.run(); + promise.succeeded(timedOut); + } + catch (Throwable x) + { + promise.failed(x); + } }); - } - else - { - promise.succeeded(false); - } + }); } public void onStreamFailure(Stream stream, Throwable failure, Callback callback) diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/IdleTimeoutTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/IdleTimeoutTest.java index 97e38e120ee..b24cf286b9b 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/IdleTimeoutTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/IdleTimeoutTest.java @@ -686,7 +686,6 @@ public void testServerIdleTimeoutIsEnforcedForQueuedRequest() throws Exception @Override public boolean handle(Request request, Response response, Callback callback) { - System.err.println("processing request " + request.getHttpURI().getPath()); requests.incrementAndGet(); handled.incrementAndGet(); phaser.get().countDown(); @@ -732,24 +731,24 @@ public void onHeaders(Stream stream, HeadersFrame frame) } // Send one more request to consume the whole session flow control window. - CountDownLatch resetLatch = new CountDownLatch(1); + CountDownLatch extraLatch = new CountDownLatch(1); MetaData.Request request = newRequest("GET", HttpFields.EMPTY); HeadersFrame frame = new HeadersFrame(request, null, false); FuturePromise promise = new FuturePromise<>(); client.newStream(frame, promise, new Stream.Listener() { @Override - public void onReset(Stream stream, ResetFrame frame, Callback callback) + public void onHeaders(Stream stream, HeadersFrame frame) { - callback.succeeded(); - resetLatch.countDown(); + responses.incrementAndGet(); + extraLatch.countDown(); } }); Stream stream = promise.get(5, TimeUnit.SECONDS); ByteBuffer data = ByteBuffer.allocate(((HTTP2Session)client).updateSendWindow(0)); stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP); - assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + assertTrue(extraLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); // Wait for WINDOW_UPDATEs to be processed by the client. await().atMost(5, TimeUnit.SECONDS).until(() -> ((HTTP2Session)client).updateSendWindow(0), Matchers.greaterThan(0)); @@ -758,7 +757,9 @@ public void onReset(Stream stream, ResetFrame frame, Callback callback) await().atMost(5, TimeUnit.SECONDS).until(handled::get, is(0)); assertThat(requests.get(), is(count - 1)); - await().atMost(5, TimeUnit.SECONDS).until(responses::get, is(count - 1)); + // The first 2 requests are handled normally and responded with 200, the last 2 are + // not handled due to timeout while queued, but they are responded anyway with a 500. + await().atMost(5, TimeUnit.SECONDS).until(responses::get, is(count + 1)); } @Test diff --git a/jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java b/jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java index fde8bb5dbfb..7ecd3b24fdf 100644 --- a/jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java +++ b/jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java @@ -14,6 +14,7 @@ package org.eclipse.jetty.http3.server; import java.util.Objects; +import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import org.eclipse.jetty.http.HttpFields; @@ -35,6 +36,7 @@ import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.ThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,14 +155,26 @@ public void onTrailer(Stream.Server stream, HeadersFrame frame) public void onIdleTimeout(Stream.Server stream, TimeoutException timeout, Promise promise) { HTTP3Stream http3Stream = (HTTP3Stream)stream; - getConnection().onIdleTimeout((HTTP3Stream)stream, timeout, (task, timedOut) -> + getConnection().onIdleTimeout(http3Stream, timeout, (task, timedOut) -> { - if (task != null) + if (task == null) { - ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession(); - protocolSession.offer(task, true); + promise.succeeded(timedOut); + return; } - promise.succeeded(timedOut); + Executor executor = http3Stream.getSession().getProtocolSession().getQuicSession().getExecutor(); + ThreadPool.executeImmediately(executor, () -> + { + try + { + task.run(); + promise.succeeded(timedOut); + } + catch (Throwable x) + { + promise.failed(x); + } + }); }); } @@ -168,12 +182,9 @@ public void onIdleTimeout(Stream.Server stream, TimeoutException timeout, Promis public void onFailure(Stream.Server stream, long error, Throwable failure) { HTTP3Stream http3Stream = (HTTP3Stream)stream; - Runnable task = getConnection().onFailure((HTTP3Stream)stream, failure); - if (task != null) - { - ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession(); - protocolSession.offer(task, true); - } + Runnable task = getConnection().onFailure(http3Stream, failure); + Executor executor = http3Stream.getSession().getProtocolSession().getQuicSession().getExecutor(); + ThreadPool.executeImmediately(executor, task); } } } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 94e60222cca..560efcdfe0b 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -641,10 +641,16 @@ public boolean onIdleExpired(TimeoutException timeout) @Override public void close() { - Runnable task = _httpChannel.onClose(); - if (task != null) - task.run(); - super.close(); + try + { + Runnable task = _httpChannel.onClose(); + if (task != null) + task.run(); + } + finally + { + super.close(); + } } @Override diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java index dcae232c908..f5ec9c436a4 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java @@ -102,6 +102,18 @@ static void executeImmediately(Executor executor, Runnable task) if (task == null) return; + Invocable.InvocationType invocationType = Invocable.getInvocationType(task); + if (invocationType == Invocable.InvocationType.NON_BLOCKING) + { + task.run(); + return; + } + if (invocationType == Invocable.InvocationType.EITHER) + { + Invocable.invokeNonBlocking(task); + return; + } + if (executor instanceof TryExecutor tryExecutor && tryExecutor.tryExecute(task)) return; @@ -112,21 +124,13 @@ static void executeImmediately(Executor executor, Runnable task) return; } - switch (Invocable.getInvocationType(task)) + try + { + new Thread(task, "jetty-immediate-executor").start(); + } + catch (Throwable ignored) { - case NON_BLOCKING -> task.run(); - case EITHER -> Invocable.invokeNonBlocking(task); - default -> - { - try - { - new Thread(task).start(); - } - catch (Throwable ignored) - { - task.run(); - } - } + task.run(); } } }