diff --git a/jms-core/src/main/java/io/micronaut/jms/annotations/Queue.java b/jms-core/src/main/java/io/micronaut/jms/annotations/Queue.java index f10a7543..1b859fd0 100644 --- a/jms-core/src/main/java/io/micronaut/jms/annotations/Queue.java +++ b/jms-core/src/main/java/io/micronaut/jms/annotations/Queue.java @@ -82,8 +82,12 @@ * option cannot be used in conjunction with {@link Queue#executor()}; if * both are specified the {@link Queue#executor()} value will be used. * + * @deprecated since 3.0.0, to align the implementation with the JMS model and the messaging libraries' presumptions. + * + * * @return the initial and max size of the thread pool */ + @Deprecated String concurrency() default "1-1"; /** @@ -102,8 +106,11 @@ * as part of a {@link JMSListener}. The executor can be maintained by * Micronaut using the {@link io.micronaut.scheduling.executor.UserExecutorConfiguration}. * + * @deprecated since 3.0.0, to align the implementation with the JMS model and the messaging libraries' presumptions. + * * @return the executor service bean name */ + @Deprecated String executor() default ""; /** diff --git a/jms-core/src/main/java/io/micronaut/jms/annotations/Topic.java b/jms-core/src/main/java/io/micronaut/jms/annotations/Topic.java index 34aa9db6..60e1cf49 100644 --- a/jms-core/src/main/java/io/micronaut/jms/annotations/Topic.java +++ b/jms-core/src/main/java/io/micronaut/jms/annotations/Topic.java @@ -72,8 +72,12 @@ * as part of a {@link JMSListener}. The executor can be maintained by * Micronaut using the {@link io.micronaut.scheduling.executor.UserExecutorConfiguration}. * + * @deprecated since 3.0.0, to align the implementation with the JMS model and the messaging libraries' presumptions. + * + * * @return the executor service name */ + @Deprecated String executor() default ""; /** diff --git a/jms-core/src/main/java/io/micronaut/jms/configuration/AbstractJMSListenerMethodProcessor.java b/jms-core/src/main/java/io/micronaut/jms/configuration/AbstractJMSListenerMethodProcessor.java index 59dcfa08..73586c3b 100644 --- a/jms-core/src/main/java/io/micronaut/jms/configuration/AbstractJMSListenerMethodProcessor.java +++ b/jms-core/src/main/java/io/micronaut/jms/configuration/AbstractJMSListenerMethodProcessor.java @@ -48,8 +48,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static javax.jms.Session.CLIENT_ACKNOWLEDGE; - /** * Abstract {@link ExecutableMethodProcessor} for annotations related to * {@link JMSListener}. Registers a {@link io.micronaut.jms.listener.JMSListener} @@ -113,8 +111,7 @@ private void validateArguments(ExecutableMethod method) { } private MessageListener generateAndBindListener(Object bean, - Executable method, - boolean acknowledge) { + Executable method) { return message -> { DefaultExecutableBinder binder = new DefaultExecutableBinder<>(); @@ -144,13 +141,13 @@ private void registerListener(ExecutableMethod method, final Object bean = beanContext.getBean(beanDefinition.getBeanType()); final ExecutorService executor = getExecutorService(destinationAnnotation); - MessageListener listener = generateAndBindListener(bean, method, CLIENT_ACKNOWLEDGE == acknowledgeMode); + MessageListener listener = generateAndBindListener(bean, method); Set errorHandlers = Stream.concat( Arrays.stream(destinationAnnotation.classValues("errorHandlers")), Arrays.stream(beanDefinition.classValues(JMSListener.class, "errorHandlers"))) .filter(JMSListenerErrorHandler.class::isAssignableFrom) - .map(clazz -> (Class) clazz) + .map(handlerClass -> (Class) handlerClass) .map(beanContext::findBean) .filter(Optional::isPresent) .map(Optional::get) @@ -160,7 +157,7 @@ private void registerListener(ExecutableMethod method, Arrays.stream(destinationAnnotation.classValues("successHandlers")), Arrays.stream(beanDefinition.classValues(JMSListener.class, "successHandlers"))) .filter(JMSListenerSuccessHandler.class::isAssignableFrom) - .map(clazz -> (Class) clazz) + .map(handlerClass -> (Class) handlerClass) .map(beanContext::findBean) .filter(Optional::isPresent) .map(Optional::get) diff --git a/jms-core/src/main/java/io/micronaut/jms/configuration/JMSQueueListenerMethodProcessor.java b/jms-core/src/main/java/io/micronaut/jms/configuration/JMSQueueListenerMethodProcessor.java index 27eed1c6..95271432 100644 --- a/jms-core/src/main/java/io/micronaut/jms/configuration/JMSQueueListenerMethodProcessor.java +++ b/jms-core/src/main/java/io/micronaut/jms/configuration/JMSQueueListenerMethodProcessor.java @@ -59,27 +59,36 @@ protected ExecutorService getExecutorService(AnnotationValue value) { final Optional concurrency = value.stringValue("concurrency"); if (executorName.isPresent() && !executorName.get().isEmpty()) { + + logger.warn("The deprecated 'executor' option of 'io.micronaut.jms.annotations.Queue' annotation is being used. Note that It will be removed soon."); + return beanContext.findBean(ExecutorService.class, Qualifiers.byName(executorName.get())) .orElseThrow(() -> new IllegalStateException( "No ExecutorService bean found with name " + executorName.get())); } - final Matcher matcher = CONCURRENCY_PATTERN.matcher(concurrency - .orElseThrow(() -> new IllegalStateException( - "Concurrency must be specified if ExecutorService is not specified"))); - Assert.isTrue(matcher.find() && matcher.groupCount() == 2, - () -> "Concurrency must be of the form int-int (e.g. \"1-10\"). " + - "Concurrency provided was " + concurrency.get()); - - int numThreads = Integer.parseInt(matcher.group(1)); - int maxThreads = Integer.parseInt(matcher.group(2)); - return new ThreadPoolExecutor( - numThreads, - maxThreads, - DEFAULT_KEEP_ALIVE_TIME, - MILLISECONDS, - new LinkedBlockingQueue<>(numThreads), - Executors.defaultThreadFactory()); + if (concurrency.isPresent()) { + + logger.warn("The deprecated 'concurrency' option of 'io.micronaut.jms.annotations.Queue' annotation is being used. Note that It will be removed soon."); + + final Matcher matcher = CONCURRENCY_PATTERN.matcher(concurrency.get()); + Assert.isTrue(matcher.find() && matcher.groupCount() == 2, + () -> "Concurrency must be of the form int-int (e.g. \"1-10\"). " + + "Concurrency provided was " + concurrency.get()); + + int numThreads = Integer.parseInt(matcher.group(1)); + int maxThreads = Integer.parseInt(matcher.group(2)); + + return new ThreadPoolExecutor( + numThreads, + maxThreads, + DEFAULT_KEEP_ALIVE_TIME, + MILLISECONDS, + new LinkedBlockingQueue<>(numThreads), + Executors.defaultThreadFactory()); + } + + return null; } @Override diff --git a/jms-core/src/main/java/io/micronaut/jms/configuration/JMSTopicListenerMethodProcessor.java b/jms-core/src/main/java/io/micronaut/jms/configuration/JMSTopicListenerMethodProcessor.java index e908b03c..44dd88c1 100644 --- a/jms-core/src/main/java/io/micronaut/jms/configuration/JMSTopicListenerMethodProcessor.java +++ b/jms-core/src/main/java/io/micronaut/jms/configuration/JMSTopicListenerMethodProcessor.java @@ -46,9 +46,16 @@ public JMSTopicListenerMethodProcessor(BeanContext beanContext, @Override protected ExecutorService getExecutorService(AnnotationValue value) { - final Optional executorName = value.stringValue("executor"); - if (executorName.isPresent() && !executorName.get().isEmpty()) { + + if (!executorName.isPresent()) { + return null; + } + + if (!executorName.get().isEmpty()) { + + logger.warn("The deprecated 'executor' option of 'io.micronaut.jms.annotations.Topic' annotation is being used. Note that It will be removed soon."); + return beanContext.findBean(ExecutorService.class, Qualifiers.byName(executorName.get())) .orElseThrow(() -> new IllegalStateException( "No ExecutorService bean found with name " + executorName.get())); diff --git a/jms-core/src/main/java/io/micronaut/jms/listener/JMSListener.java b/jms-core/src/main/java/io/micronaut/jms/listener/JMSListener.java index 4bc15908..83a96c5a 100644 --- a/jms-core/src/main/java/io/micronaut/jms/listener/JMSListener.java +++ b/jms-core/src/main/java/io/micronaut/jms/listener/JMSListener.java @@ -22,6 +22,7 @@ import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; @@ -45,7 +46,10 @@ * are called sequentially. If a handler throws an error then it is caught and once all handlers have completed, those errors * are rethrown * If any error is thrown during message handling (either by the listener itself, or by a success handler), then all the - * {@link JMSListenerErrorHandler}s are called sequentially. Error handlers cannot themselves throw errors. + * {@link JMSListenerErrorHandler}s are called sequentially. + * + * Note: To handle the special cases (e.g.the negative acknowledger feature in the AWS SQS base implementation), an error handler could throw an error to expose it to the base implementation. + * However, it must be ensured that the error handler executes after all other error handlers. * * @author Elliott Pope * @since 2.1.1 @@ -125,32 +129,41 @@ public void addErrorHandlers(Collection handl * @throws JMSException - if any JMS related exception occurs while configuring the listener. */ public void start() throws JMSException { - MessageConsumer consumer; + MessageConsumer messageConsumer; if (messageSelector.isPresent()) { - consumer = session.createConsumer(lookupDestination(destinationType, destination, session), messageSelector.get()); + messageConsumer = session.createConsumer(lookupDestination(destinationType, destination, session), messageSelector.get()); + } else { + messageConsumer = session.createConsumer(lookupDestination(destinationType, destination, session)); + } + + if (executor == null) { + messageConsumer.setMessageListener(this::handleMessage); } else { - consumer = session.createConsumer(lookupDestination(destinationType, destination, session)); + messageConsumer.setMessageListener(msg -> executor.submit( + () -> handleMessage(msg))); } - consumer.setMessageListener((msg) -> executor.submit(() -> { - try { - delegate.onMessage(msg); - Throwable ex = new Throwable(); - successHandlers.forEach(handler -> { - try { - handler.handle(session, msg); - } catch (JMSException e) { - LOGGER.error("Failed to handle successful message receive: " + e.getMessage(), e); - ex.addSuppressed(e); - } - }); - if (ex.getSuppressed().length > 0) { - errorHandlers.forEach(handler -> handler.handle(session, msg, ex)); + + this.consumer = messageConsumer; + } + + private void handleMessage(Message msg) { + try { + delegate.onMessage(msg); + Throwable ex = new Throwable(); + successHandlers.forEach(handler -> { + try { + handler.handle(session, msg); + } catch (JMSException e) { + LOGGER.error("Failed to handle successful message receive: " + e.getMessage(), e); + ex.addSuppressed(e); } - } catch (Throwable e) { - errorHandlers.forEach(handler -> handler.handle(session, msg, e)); + }); + if (ex.getSuppressed().length > 0) { + errorHandlers.forEach(handler -> handler.handle(session, msg, ex)); } - })); - this.consumer = consumer; + } catch (Exception e) { + errorHandlers.forEach(handler -> handler.handle(session, msg, e)); + } } /** diff --git a/jms-core/src/main/java/io/micronaut/jms/listener/JMSListenerRegistry.java b/jms-core/src/main/java/io/micronaut/jms/listener/JMSListenerRegistry.java index 48ab6a02..8b8c1a35 100644 --- a/jms-core/src/main/java/io/micronaut/jms/listener/JMSListenerRegistry.java +++ b/jms-core/src/main/java/io/micronaut/jms/listener/JMSListenerRegistry.java @@ -64,7 +64,6 @@ public JMSListenerRegistry( * @throws JMSException - if the listener fails to start */ public void register(JMSListener listener, boolean autoStart) throws JMSException { - listener.addErrorHandlers(new LoggingJMSListenerErrorHandler()); if (autoStart) { listener.start(); } diff --git a/settings.gradle b/settings.gradle index 6375bd51..20dea47d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,7 +15,6 @@ include 'jms-core' include 'jms-activemq-classic' include 'jms-activemq-artemis' include 'jms-sqs' -include 'jms-test' include 'docs-examples:example-groovy' include 'docs-examples:example-java' include 'docs-examples:example-kotlin'