diff --git a/docs/src/main/asciidoc/mp/threading.adoc b/docs/src/main/asciidoc/mp/threading.adoc index 5e613936de5..d2f926c8810 100644 --- a/docs/src/main/asciidoc/mp/threading.adoc +++ b/docs/src/main/asciidoc/mp/threading.adoc @@ -49,7 +49,9 @@ yet this process is still underway and some legacy libraries may never be fully Helidon MP supports a new `@ExecuteOn` annotation to give developers full control on how to run tasks. This annotation can be applied to any CDI bean method to control the type of thread in -which invocations of that method shall execute on. +which invocations of that method shall execute on. If such a method returns `CompletionStage` +or `CompletableFuture`, it is assumed to be asynchronous and shall execute in a new thread +but without blocking the caller's thread. include::{rootdir}/includes/dependencies.adoc[] @@ -129,10 +131,20 @@ but that is not a requirement in CDI. include::{sourcedir}/mp/ExecuteOnSnippets.java[tag=snippet_2, indent=0] ---- -3. Finally, it is also possible to explicitly execute a method in a -virtual thread, blocking the caller thread until the method execution is complete. +3. It is also possible to explicitly execute a method in a +virtual thread, blocking the caller's thread until the method execution is complete. + [source,java] ---- include::{sourcedir}/mp/ExecuteOnSnippets.java[tag=snippet_3, indent=0] ---- + +4. Finally, a method can be executed in another thread but without blocking +the caller's thread. This behavior is triggered automatically when the bean method returns +`CompletionStage` or `CompletableFuture`. ++ +[source,java] +---- +include::{sourcedir}/mp/ExecuteOnSnippets.java[tag=snippet_4, indent=0] +---- + diff --git a/docs/src/main/java/io/helidon/docs/mp/ExecuteOnSnippets.java b/docs/src/main/java/io/helidon/docs/mp/ExecuteOnSnippets.java index d3258a00d17..b82a0848fbb 100644 --- a/docs/src/main/java/io/helidon/docs/mp/ExecuteOnSnippets.java +++ b/docs/src/main/java/io/helidon/docs/mp/ExecuteOnSnippets.java @@ -17,6 +17,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import io.helidon.microprofile.cdi.ExecuteOn; import jakarta.enterprise.inject.Produces; @@ -69,4 +71,15 @@ void someTask() { } } // end::snippet_3[] + + // tag::snippet_4[] + public class MyVirtualBeanAsync { + + @ExecuteOn(ThreadType.VIRTUAL) + CompletionStage someTask() { + // run task on virtual thread without blocking caller + return CompletableFuture.completedFuture("DONE"); + } + } + // end::snippet_4[] } diff --git a/microprofile/cdi/src/main/java/io/helidon/microprofile/cdi/ExecuteOn.java b/microprofile/cdi/src/main/java/io/helidon/microprofile/cdi/ExecuteOn.java index eb08d44c41c..34d1015c4a0 100644 --- a/microprofile/cdi/src/main/java/io/helidon/microprofile/cdi/ExecuteOn.java +++ b/microprofile/cdi/src/main/java/io/helidon/microprofile/cdi/ExecuteOn.java @@ -27,7 +27,9 @@ import jakarta.interceptor.InterceptorBinding; /** - * Annotates a CDI bean method that shall be executed on a new thread. + * Annotates a CDI bean method that shall be executed on a new thread. If the method returns + * {@link java.util.concurrent.CompletableFuture} or {@link java.util.concurrent.CompletionStage}, + * it is assumed to be asynchronous. */ @Documented @Retention(RetentionPolicy.RUNTIME) @@ -65,7 +67,7 @@ enum ThreadType { ThreadType value() default ThreadType.PLATFORM; /** - * Waiting timeout. + * Waiting timeout, used when the method is synchronous. * * @return waiting timeout */ @@ -73,7 +75,7 @@ enum ThreadType { long timeout() default 10000L; /** - * Waiting time unit. + * Waiting time unit, used when the method is synchronous. * * @return waiting time unit */ diff --git a/microprofile/cdi/src/main/java/io/helidon/microprofile/cdi/ExecuteOnExtension.java b/microprofile/cdi/src/main/java/io/helidon/microprofile/cdi/ExecuteOnExtension.java index 2ef68aab964..456818b5ce8 100644 --- a/microprofile/cdi/src/main/java/io/helidon/microprofile/cdi/ExecuteOnExtension.java +++ b/microprofile/cdi/src/main/java/io/helidon/microprofile/cdi/ExecuteOnExtension.java @@ -18,8 +18,11 @@ import java.lang.reflect.Method; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import io.helidon.common.LazyValue; @@ -32,6 +35,7 @@ import jakarta.enterprise.inject.spi.Bean; import jakarta.enterprise.inject.spi.BeanManager; import jakarta.enterprise.inject.spi.BeforeBeanDiscovery; +import jakarta.enterprise.inject.spi.DeploymentException; import jakarta.enterprise.inject.spi.Extension; import jakarta.enterprise.inject.spi.ProcessManagedBean; import jakarta.enterprise.inject.spi.ProcessSyntheticBean; @@ -41,7 +45,13 @@ */ public class ExecuteOnExtension implements Extension { + enum MethodType { + BLOCKING, + NON_BLOCKING + }; + private final LazyValue>> methodMap = LazyValue.create(ConcurrentHashMap::new); + private final LazyValue> methodType = LazyValue.create(ConcurrentHashMap::new); void registerMethods(BeanManager bm, @Observes ProcessSyntheticBean event) { registerMethods(bm.createAnnotatedType(event.getBean().getBeanClass())); @@ -54,7 +64,9 @@ void registerMethods(@Observes ProcessManagedBean event) { private void registerMethods(AnnotatedType type) { for (AnnotatedMethod annotatedMethod : type.getMethods()) { if (annotatedMethod.isAnnotationPresent(ExecuteOn.class)) { - methodMap.get().put(annotatedMethod.getJavaMember(), annotatedMethod); + Method method = annotatedMethod.getJavaMember(); + methodMap.get().put(method, annotatedMethod); + methodType.get().put(method, findMethodType(method)); } } } @@ -63,6 +75,19 @@ void validateAnnotations(BeanManager bm, @Observes @Initialized(ApplicationScope methodMap.get().forEach((method, annotatedMethod) -> validateExecutor(bm, annotatedMethod)); } + + private static MethodType findMethodType(Method method) { + Class returnType = method.getReturnType(); + if (CompletionStage.class.isAssignableFrom(returnType) + || CompletableFuture.class.isAssignableFrom(returnType)) { + return MethodType.NON_BLOCKING; + } + if (Future.class.equals(returnType)) { + throw new DeploymentException("Future is not supported as return type of ExecuteOn method"); + } + return MethodType.BLOCKING; + } + private static void validateExecutor(BeanManager bm, AnnotatedMethod method) { ExecuteOn executeOn = method.getAnnotation(ExecuteOn.class); if (executeOn.value() == ExecuteOn.ThreadType.EXECUTOR) { @@ -85,6 +110,10 @@ ExecuteOn getAnnotation(Method method) { throw new IllegalArgumentException("Unable to map method " + method); } + MethodType getMethodType(Method method) { + return methodType.get().get(method); + } + void registerInterceptors(@Observes BeforeBeanDiscovery discovery, BeanManager bm) { discovery.addAnnotatedType(bm.createAnnotatedType(ExecuteOnInterceptor.class), ExecuteOnInterceptor.class.getName()); @@ -92,5 +121,6 @@ void registerInterceptors(@Observes BeforeBeanDiscovery discovery, BeanManager b void clearMethodMap() { methodMap.get().clear(); + methodType.get().clear(); } } diff --git a/microprofile/cdi/src/main/java/io/helidon/microprofile/cdi/ExecuteOnInterceptor.java b/microprofile/cdi/src/main/java/io/helidon/microprofile/cdi/ExecuteOnInterceptor.java index a18cca3901f..d8f34c51812 100644 --- a/microprofile/cdi/src/main/java/io/helidon/microprofile/cdi/ExecuteOnInterceptor.java +++ b/microprofile/cdi/src/main/java/io/helidon/microprofile/cdi/ExecuteOnInterceptor.java @@ -16,7 +16,13 @@ package io.helidon.microprofile.cdi; +import java.lang.reflect.Method; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import io.helidon.common.LazyValue; import io.helidon.common.configurable.ThreadPoolSupplier; @@ -47,56 +53,111 @@ class ExecuteOnInterceptor { private static final LazyValue PLATFORM_EXECUTOR_SERVICE = LazyValue.create(() -> { - Config mpConfig = ConfigProvider.getConfig(); - io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig); - return ThreadPoolSupplier.builder() - .threadNamePrefix(EXECUTE_ON) - .config(config.get(RUN_ON_PLATFORM_THREAD)) - .virtualThreads(false) // overrides to platform threads - .build() - .get(); - }); + Config mpConfig = ConfigProvider.getConfig(); + io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig); + return ThreadPoolSupplier.builder() + .threadNamePrefix(EXECUTE_ON) + .config(config.get(RUN_ON_PLATFORM_THREAD)) + .virtualThreads(false) // overrides to platform threads + .build() + .get(); + }); private static final LazyValue VIRTUAL_EXECUTOR_SERVICE = LazyValue.create(() -> { - Config mpConfig = ConfigProvider.getConfig(); - io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig); - String threadNamePrefix = config.get(RUN_ON_VIRTUAL_THREAD) - .get("thread-name-prefix") - .asString() - .asOptional() - .orElse(EXECUTE_ON); - return ThreadPoolSupplier.builder() - .threadNamePrefix(threadNamePrefix) - .virtualThreads(true) - .build() - .get(); - }); + Config mpConfig = ConfigProvider.getConfig(); + io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig); + String threadNamePrefix = config.get(RUN_ON_VIRTUAL_THREAD) + .get("thread-name-prefix") + .asString() + .asOptional() + .orElse(EXECUTE_ON); + return ThreadPoolSupplier.builder() + .threadNamePrefix(threadNamePrefix) + .virtualThreads(true) + .build() + .get(); + }); @Inject private ExecuteOnExtension extension; /** - * Intercepts a call to bean method annotated by {@code @OnNewThread}. + * Intercepts a call to bean method annotated by {@link io.helidon.microprofile.cdi.ExecuteOn}. * * @param context Invocation context. * @return Whatever the intercepted method returns. * @throws Throwable If a problem occurs. */ @AroundInvoke + @SuppressWarnings("unchecked") public Object executeOn(InvocationContext context) throws Throwable { - ExecuteOn executeOn = extension.getAnnotation(context.getMethod()); - return switch (executeOn.value()) { - case PLATFORM -> PLATFORM_EXECUTOR_SERVICE.get() - .submit(context::proceed) - .get(executeOn.timeout(), executeOn.unit()); - case VIRTUAL -> VIRTUAL_EXECUTOR_SERVICE.get() - .submit(context::proceed) - .get(executeOn.timeout(), executeOn.unit()); - case EXECUTOR -> findExecutor(executeOn.executorName()) - .submit(context::proceed) - .get(executeOn.timeout(), executeOn.unit()); + Method method = context.getMethod(); + ExecuteOn executeOn = extension.getAnnotation(method); + + // find executor service to use + ExecutorService executorService = switch (executeOn.value()) { + case PLATFORM -> PLATFORM_EXECUTOR_SERVICE.get(); + case VIRTUAL -> VIRTUAL_EXECUTOR_SERVICE.get(); + case EXECUTOR -> findExecutor(executeOn.executorName()); }; + + switch (extension.getMethodType(method)) { + case BLOCKING: + // block until call completes + return executorService.submit(context::proceed).get(executeOn.timeout(), executeOn.unit()); + case NON_BLOCKING: + // execute call asynchronously + CompletableFuture supplyFuture = CompletableFuture.supplyAsync( + () -> { + try { + return context.proceed(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, executorService); + + // return new, cancellable completable future + AtomicBoolean mayInterrupt = new AtomicBoolean(false); + CompletableFuture resultFuture = new CompletableFuture<>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + mayInterrupt.set(mayInterruptIfRunning); + return super.cancel(mayInterruptIfRunning); + } + }; + + // link completion of supplyFuture with resultFuture + supplyFuture.whenComplete((result, throwable) -> { + if (throwable == null) { + // result must be CompletionStage or CompletableFuture + CompletableFuture cfResult = !(result instanceof CompletableFuture) + ? ((CompletionStage) result).toCompletableFuture() + : (CompletableFuture) result; + cfResult.whenComplete((r, t) -> { + if (t == null) { + resultFuture.complete(r); + } else { + resultFuture.completeExceptionally(unwrapThrowable(t)); + } + }); + } else { + resultFuture.completeExceptionally(unwrapThrowable(throwable)); + } + }); + + // if resultFuture is cancelled, then cancel supplyFuture + resultFuture.exceptionally(t -> { + if (t instanceof CancellationException) { + supplyFuture.cancel(mayInterrupt.get()); + } + return null; + }); + + return resultFuture; + default: + throw new IllegalStateException("Unrecognized ExecuteOn method type"); + } } /** @@ -108,4 +169,14 @@ public Object executeOn(InvocationContext context) throws Throwable { private static ExecutorService findExecutor(String executorName) { return CDI.current().select(ExecutorService.class, NamedLiteral.of(executorName)).get(); } + + /** + * Extract underlying throwable. + * + * @param t the throwable + * @return the wrapped throwable + */ + private static Throwable unwrapThrowable(Throwable t) { + return t instanceof ExecutionException ? t.getCause() : t; + } } diff --git a/microprofile/cdi/src/test/java/io/helidon/microprofile/cdi/ExecuteOnAsyncTest.java b/microprofile/cdi/src/test/java/io/helidon/microprofile/cdi/ExecuteOnAsyncTest.java new file mode 100644 index 00000000000..c0c5186b1bc --- /dev/null +++ b/microprofile/cdi/src/test/java/io/helidon/microprofile/cdi/ExecuteOnAsyncTest.java @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.microprofile.cdi; + +import java.util.Optional; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.helidon.common.context.Context; +import io.helidon.common.context.Contexts; + +import jakarta.enterprise.inject.Produces; +import jakarta.enterprise.inject.se.SeContainer; +import jakarta.enterprise.inject.se.SeContainerInitializer; +import jakarta.enterprise.inject.spi.CDI; +import jakarta.inject.Named; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static io.helidon.microprofile.cdi.ExecuteOn.ThreadType; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + +class ExecuteOnAsyncTest { + + public static final int SHORT_TIMEOUT = 500; + public static final int LONG_TIMEOUT = 10000; + + static SeContainer seContainer; + static OnNewThreadBean bean; + + @BeforeAll + @SuppressWarnings("unchecked") + static void startCdi() { + seContainer = SeContainerInitializer.newInstance() + .disableDiscovery() + .addExtensions(ExecuteOnExtension.class) + .addBeanClasses(OnNewThreadBean.class) + .initialize(); + bean = CDI.current().select(OnNewThreadBean.class).get(); + } + + @AfterAll + static void stopCdi() { + seContainer.close(); + } + + static class OnNewThreadBean { + + @ExecuteOn(ThreadType.PLATFORM) + CompletionStage cpuIntensive() { + return CompletableFuture.completedFuture(Thread.currentThread()); + } + + @ExecuteOn(value = ThreadType.PLATFORM) + CompletableFuture evenMoreCpuIntensive() { + return CompletableFuture.completedFuture(Thread.currentThread()); + } + + @ExecuteOn(ThreadType.VIRTUAL) + CompletionStage onVirtualThread() { + return CompletableFuture.completedFuture(Thread.currentThread()); + } + + @ExecuteOn(value = ThreadType.EXECUTOR, executorName = "my-executor") + CompletableFuture onMyExecutor() { + return CompletableFuture.completedFuture(Thread.currentThread()); + } + + @ExecuteOn(ThreadType.VIRTUAL) + CompletionStage> verifyContextVirtual() { + return CompletableFuture.completedFuture( + Contexts.context().flatMap(context -> context.get("hello", String.class))); + } + + @ExecuteOn(ThreadType.PLATFORM) + CompletableFuture> verifyContextPlatform() { + return CompletableFuture.completedFuture( + Contexts.context().flatMap(context -> context.get("hello", String.class))); + } + + @ExecuteOn(ThreadType.VIRTUAL) + CompletableFuture eternallyBlocked() throws BrokenBarrierException, InterruptedException { + CyclicBarrier barrier = new CyclicBarrier(2); + barrier.await(); + return CompletableFuture.completedFuture(Thread.currentThread()); + } + + @ExecuteOn(ThreadType.VIRTUAL) + CompletableFuture alwaysFails() { + return CompletableFuture.failedFuture(new UnsupportedOperationException("Not supported")); + } + + @Produces + @Named("my-executor") + ExecutorService myExecutor() { + return Executors.newFixedThreadPool(2); + } + } + + @Test + void cpuIntensiveTest() throws ExecutionException, InterruptedException, TimeoutException { + CompletionStage completionStage = bean.cpuIntensive(); + Thread thread = completionStage.toCompletableFuture().get(LONG_TIMEOUT, TimeUnit.MILLISECONDS); + assertThat(thread.isVirtual(), is(false)); + assertThat(thread.getName().startsWith("my-platform-thread"), is(true)); + } + + @Test + void evenMoreCpuIntensiveTest() throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture completableFuture = bean.evenMoreCpuIntensive(); + Thread thread = completableFuture.get(LONG_TIMEOUT, TimeUnit.MILLISECONDS); + assertThat(thread.isVirtual(), is(false)); + assertThat(thread.getName().startsWith("my-platform-thread"), is(true)); + } + + @Test + void onVirtualThread() throws ExecutionException, InterruptedException, TimeoutException { + CompletionStage completionStage = bean.onVirtualThread(); + Thread thread = completionStage.toCompletableFuture().get(LONG_TIMEOUT, TimeUnit.MILLISECONDS); + assertThat(thread.isVirtual(), is(true)); + assertThat(thread.getName().startsWith("my-virtual-thread"), is(true)); + } + + @Test + void onMyExecutor() throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture completableFuture = bean.onMyExecutor(); + Thread thread = completableFuture.get(LONG_TIMEOUT, TimeUnit.MILLISECONDS); + assertThat(thread.isVirtual(), is(false)); + assertThat(thread.getName().startsWith("pool"), is(true)); + } + + @Test + void verifyContextVirtual() throws ExecutionException, InterruptedException, TimeoutException { + Context context = Contexts.globalContext(); + context.register("hello", "world"); + CompletionStage> completionStage = Contexts.runInContext(context, bean::verifyContextVirtual); + Optional optional = completionStage.toCompletableFuture().get(LONG_TIMEOUT, TimeUnit.MILLISECONDS); + assertThat(optional.orElseThrow(), is("world")); + } + + @Test + void verifyContextPlatform() throws ExecutionException, InterruptedException, TimeoutException { + Context context = Contexts.globalContext(); + context.register("hello", "world"); + CompletableFuture> completableFuture = Contexts.runInContext(context, bean::verifyContextPlatform); + Optional optional = completableFuture.get(LONG_TIMEOUT, TimeUnit.MILLISECONDS); + assertThat(optional.orElseThrow(), is("world")); + } + + @Test + void testEternallyBlocked() throws Exception { + CompletableFuture completableFuture = bean.eternallyBlocked(); + assertThrows(TimeoutException.class, + () -> completableFuture.get(SHORT_TIMEOUT, TimeUnit.MILLISECONDS)); + completableFuture.cancel(true); + assertThrows(CancellationException.class, + () -> completableFuture.get(LONG_TIMEOUT, TimeUnit.MILLISECONDS)); + } + + @Test + void testAlwaysFails() { + CompletableFuture completableFuture = bean.alwaysFails(); + try { + completableFuture.get(LONG_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + assertThat(e.getCause(), is(instanceOf(UnsupportedOperationException.class))); + } catch (Exception e) { + fail(); + } + } +}