Skip to content

Commit

Permalink
The AWS SQS negative acknowledger usage bug fix (#335)
Browse files Browse the repository at this point in the history
Fixes #170
  • Loading branch information
sbodvanski authored Feb 23, 2023
1 parent 6232ffe commit 5ce9ba2
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@
* option cannot be used in conjunction with {@link Queue#executor()}; if
* both are specified the {@link Queue#executor()} value will be used.
*
* @deprecated since 3.0.0, to align the implementation with the JMS model and the messaging libraries' presumptions.
*
*
* @return the initial and max size of the thread pool
*/
@Deprecated
String concurrency() default "1-1";

/**
Expand All @@ -102,8 +106,11 @@
* as part of a {@link JMSListener}. The executor can be maintained by
* Micronaut using the {@link io.micronaut.scheduling.executor.UserExecutorConfiguration}.
*
* @deprecated since 3.0.0, to align the implementation with the JMS model and the messaging libraries' presumptions.
*
* @return the executor service bean name
*/
@Deprecated
String executor() default "";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@
* as part of a {@link JMSListener}. The executor can be maintained by
* Micronaut using the {@link io.micronaut.scheduling.executor.UserExecutorConfiguration}.
*
* @deprecated since 3.0.0, to align the implementation with the JMS model and the messaging libraries' presumptions.
*
*
* @return the executor service name
*/
@Deprecated
String executor() default "";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static javax.jms.Session.CLIENT_ACKNOWLEDGE;

/**
* Abstract {@link ExecutableMethodProcessor} for annotations related to
* {@link JMSListener}. Registers a {@link io.micronaut.jms.listener.JMSListener}
Expand Down Expand Up @@ -113,8 +111,7 @@ private void validateArguments(ExecutableMethod<?, ?> method) {
}

private MessageListener generateAndBindListener(Object bean,
Executable<?, ?> method,
boolean acknowledge) {
Executable<?, ?> method) {

return message -> {
DefaultExecutableBinder<Message> binder = new DefaultExecutableBinder<>();
Expand Down Expand Up @@ -144,13 +141,13 @@ private void registerListener(ExecutableMethod<?, ?> method,
final Object bean = beanContext.getBean(beanDefinition.getBeanType());
final ExecutorService executor = getExecutorService(destinationAnnotation);

MessageListener listener = generateAndBindListener(bean, method, CLIENT_ACKNOWLEDGE == acknowledgeMode);
MessageListener listener = generateAndBindListener(bean, method);

Set<JMSListenerErrorHandler> errorHandlers = Stream.concat(
Arrays.stream(destinationAnnotation.classValues("errorHandlers")),
Arrays.stream(beanDefinition.classValues(JMSListener.class, "errorHandlers")))
.filter(JMSListenerErrorHandler.class::isAssignableFrom)
.map(clazz -> (Class<? extends JMSListenerErrorHandler>) clazz)
.map(handlerClass -> (Class<? extends JMSListenerErrorHandler>) handlerClass)
.map(beanContext::findBean)
.filter(Optional::isPresent)
.map(Optional::get)
Expand All @@ -160,7 +157,7 @@ private void registerListener(ExecutableMethod<?, ?> method,
Arrays.stream(destinationAnnotation.classValues("successHandlers")),
Arrays.stream(beanDefinition.classValues(JMSListener.class, "successHandlers")))
.filter(JMSListenerSuccessHandler.class::isAssignableFrom)
.map(clazz -> (Class<? extends JMSListenerSuccessHandler>) clazz)
.map(handlerClass -> (Class<? extends JMSListenerSuccessHandler>) handlerClass)
.map(beanContext::findBean)
.filter(Optional::isPresent)
.map(Optional::get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,36 @@ protected ExecutorService getExecutorService(AnnotationValue<Queue> value) {
final Optional<String> concurrency = value.stringValue("concurrency");

if (executorName.isPresent() && !executorName.get().isEmpty()) {

logger.warn("The deprecated 'executor' option of 'io.micronaut.jms.annotations.Queue' annotation is being used. Note that It will be removed soon.");

return beanContext.findBean(ExecutorService.class, Qualifiers.byName(executorName.get()))
.orElseThrow(() -> new IllegalStateException(
"No ExecutorService bean found with name " + executorName.get()));
}

final Matcher matcher = CONCURRENCY_PATTERN.matcher(concurrency
.orElseThrow(() -> new IllegalStateException(
"Concurrency must be specified if ExecutorService is not specified")));
Assert.isTrue(matcher.find() && matcher.groupCount() == 2,
() -> "Concurrency must be of the form int-int (e.g. \"1-10\"). " +
"Concurrency provided was " + concurrency.get());

int numThreads = Integer.parseInt(matcher.group(1));
int maxThreads = Integer.parseInt(matcher.group(2));
return new ThreadPoolExecutor(
numThreads,
maxThreads,
DEFAULT_KEEP_ALIVE_TIME,
MILLISECONDS,
new LinkedBlockingQueue<>(numThreads),
Executors.defaultThreadFactory());
if (concurrency.isPresent()) {

logger.warn("The deprecated 'concurrency' option of 'io.micronaut.jms.annotations.Queue' annotation is being used. Note that It will be removed soon.");

final Matcher matcher = CONCURRENCY_PATTERN.matcher(concurrency.get());
Assert.isTrue(matcher.find() && matcher.groupCount() == 2,
() -> "Concurrency must be of the form int-int (e.g. \"1-10\"). " +
"Concurrency provided was " + concurrency.get());

int numThreads = Integer.parseInt(matcher.group(1));
int maxThreads = Integer.parseInt(matcher.group(2));

return new ThreadPoolExecutor(
numThreads,
maxThreads,
DEFAULT_KEEP_ALIVE_TIME,
MILLISECONDS,
new LinkedBlockingQueue<>(numThreads),
Executors.defaultThreadFactory());
}

return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,16 @@ public JMSTopicListenerMethodProcessor(BeanContext beanContext,

@Override
protected ExecutorService getExecutorService(AnnotationValue<Topic> value) {

final Optional<String> executorName = value.stringValue("executor");
if (executorName.isPresent() && !executorName.get().isEmpty()) {

if (!executorName.isPresent()) {
return null;
}

if (!executorName.get().isEmpty()) {

logger.warn("The deprecated 'executor' option of 'io.micronaut.jms.annotations.Topic' annotation is being used. Note that It will be removed soon.");

return beanContext.findBean(ExecutorService.class, Qualifiers.byName(executorName.get()))
.orElseThrow(() -> new IllegalStateException(
"No ExecutorService bean found with name " + executorName.get()));
Expand Down
57 changes: 35 additions & 22 deletions jms-core/src/main/java/io/micronaut/jms/listener/JMSListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
Expand All @@ -45,7 +46,10 @@
* are called sequentially. If a handler throws an error then it is caught and once all handlers have completed, those errors
* are rethrown
* If any error is thrown during message handling (either by the listener itself, or by a success handler), then all the
* {@link JMSListenerErrorHandler}s are called sequentially. Error handlers cannot themselves throw errors.
* {@link JMSListenerErrorHandler}s are called sequentially.
*
* Note: To handle the special cases (e.g.the negative acknowledger feature in the AWS SQS base implementation), an error handler could throw an error to expose it to the base implementation.
* However, it must be ensured that the error handler executes after all other error handlers.
*
* @author Elliott Pope
* @since 2.1.1
Expand Down Expand Up @@ -125,32 +129,41 @@ public void addErrorHandlers(Collection<? extends JMSListenerErrorHandler> handl
* @throws JMSException - if any JMS related exception occurs while configuring the listener.
*/
public void start() throws JMSException {
MessageConsumer consumer;
MessageConsumer messageConsumer;
if (messageSelector.isPresent()) {
consumer = session.createConsumer(lookupDestination(destinationType, destination, session), messageSelector.get());
messageConsumer = session.createConsumer(lookupDestination(destinationType, destination, session), messageSelector.get());
} else {
messageConsumer = session.createConsumer(lookupDestination(destinationType, destination, session));
}

if (executor == null) {
messageConsumer.setMessageListener(this::handleMessage);
} else {
consumer = session.createConsumer(lookupDestination(destinationType, destination, session));
messageConsumer.setMessageListener(msg -> executor.submit(
() -> handleMessage(msg)));
}
consumer.setMessageListener((msg) -> executor.submit(() -> {
try {
delegate.onMessage(msg);
Throwable ex = new Throwable();
successHandlers.forEach(handler -> {
try {
handler.handle(session, msg);
} catch (JMSException e) {
LOGGER.error("Failed to handle successful message receive: " + e.getMessage(), e);
ex.addSuppressed(e);
}
});
if (ex.getSuppressed().length > 0) {
errorHandlers.forEach(handler -> handler.handle(session, msg, ex));

this.consumer = messageConsumer;
}

private void handleMessage(Message msg) {
try {
delegate.onMessage(msg);
Throwable ex = new Throwable();
successHandlers.forEach(handler -> {
try {
handler.handle(session, msg);
} catch (JMSException e) {
LOGGER.error("Failed to handle successful message receive: " + e.getMessage(), e);
ex.addSuppressed(e);
}
} catch (Throwable e) {
errorHandlers.forEach(handler -> handler.handle(session, msg, e));
});
if (ex.getSuppressed().length > 0) {
errorHandlers.forEach(handler -> handler.handle(session, msg, ex));
}
}));
this.consumer = consumer;
} catch (Exception e) {
errorHandlers.forEach(handler -> handler.handle(session, msg, e));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public JMSListenerRegistry(
* @throws JMSException - if the listener fails to start
*/
public void register(JMSListener listener, boolean autoStart) throws JMSException {
listener.addErrorHandlers(new LoggingJMSListenerErrorHandler());
if (autoStart) {
listener.start();
}
Expand Down
1 change: 0 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ include 'jms-core'
include 'jms-activemq-classic'
include 'jms-activemq-artemis'
include 'jms-sqs'
include 'jms-test'
include 'docs-examples:example-groovy'
include 'docs-examples:example-java'
include 'docs-examples:example-kotlin'
Expand Down

0 comments on commit 5ce9ba2

Please sign in to comment.