Skip to content

Commit

Permalink
SMLC: defer counter release until fetched messages are processed
Browse files Browse the repository at this point in the history
Currently, the `SimpleMessageListenerContainer` is stopped immediately
when `cancelOK` is received.

The expectation do not stop an application context until all the fetched
messages are processed.
Therefore, move `this.activeObjectCounter.release(this);` to the `BlockingQueueConsumer.nextMessage()`
if the internal queue is empty and `cancelled` has been requested.

* Adjust all the SMLC tests for a shorter `receiveTimeout` to not have blocking for nothing
* Also add `System.setProperty("spring.amqp.deserialization.trust.all", "true");` to be
able to run tests from IDE

**Cherry-pick to `3.0.x`**
  • Loading branch information
artembilan committed Feb 5, 2024
1 parent aa46e3c commit ca51ca6
Show file tree
Hide file tree
Showing 35 changed files with 174 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -135,6 +135,7 @@ public String baz(String in) {
public SimpleMessageListenerContainer smlc1() throws IOException {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueueNames("foo", "bar");
container.setReceiveTimeout(10);
container.setMessageListener(new MessageListenerAdapter(new Object() {

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
}
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
if (message == null && this.cancelled.get()) {
this.activeObjectCounter.release(this);
throw new ConsumerCancelledException();
}
return message;
Expand Down Expand Up @@ -990,7 +991,6 @@ public void handleCancelOk(String consumerTag) {
+ "); " + BlockingQueueConsumer.this);
}
this.canceled = true;
BlockingQueueConsumer.this.activeObjectCounter.release(BlockingQueueConsumer.this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -382,17 +382,17 @@ public void testStopCancelled() throws Exception {
@Test
void ctorCoverage() {
AsyncRabbitTemplate template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk");
assertThat(template).extracting(t -> t.getRabbitTemplate())
assertThat(template).extracting(AsyncRabbitTemplate::getRabbitTemplate)
.extracting("exchange")
.isEqualTo("ex");
assertThat(template).extracting(t -> t.getRabbitTemplate())
assertThat(template).extracting(AsyncRabbitTemplate::getRabbitTemplate)
.extracting("routingKey")
.isEqualTo("rk");
template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk", "rq");
assertThat(template).extracting(t -> t.getRabbitTemplate())
assertThat(template).extracting(AsyncRabbitTemplate::getRabbitTemplate)
.extracting("exchange")
.isEqualTo("ex");
assertThat(template).extracting(t -> t.getRabbitTemplate())
assertThat(template).extracting(AsyncRabbitTemplate::getRabbitTemplate)
.extracting("routingKey")
.isEqualTo("rk");
assertThat(template)
Expand All @@ -402,10 +402,10 @@ void ctorCoverage() {
.extracting("queueNames")
.isEqualTo(new String[] { "rq" });
template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk", "rq", "ra");
assertThat(template).extracting(t -> t.getRabbitTemplate())
assertThat(template).extracting(AsyncRabbitTemplate::getRabbitTemplate)
.extracting("exchange")
.isEqualTo("ex");
assertThat(template).extracting(t -> t.getRabbitTemplate())
assertThat(template).extracting(AsyncRabbitTemplate::getRabbitTemplate)
.extracting("routingKey")
.isEqualTo("rk");
assertThat(template)
Expand Down Expand Up @@ -522,6 +522,7 @@ public RabbitTemplate templateForDirect(ConnectionFactory connectionFactory) {
@Primary
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setReceiveTimeout(10);
container.setAfterReceivePostProcessors(new GUnzipPostProcessor());
container.setQueueNames(replies().getName());
return container;
Expand All @@ -540,6 +541,7 @@ public AsyncRabbitTemplate asyncDirectTemplate(RabbitTemplate templateForDirect)
@Bean
public SimpleMessageListenerContainer remoteContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setReceiveTimeout(10);
container.setQueueNames(requests().getName());
container.setAfterReceivePostProcessors(new GUnzipPostProcessor());
MessageListenerAdapter messageListener =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2020 the original author or authors.
* Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -142,6 +142,7 @@ public void testFullConfiguration(ApplicationContext context) {
// Resolve the container and invoke a message on it

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setReceiveTimeout(10);
endpoint.setupListenerContainer(container);
MessagingMessageListenerAdapter listener = (MessagingMessageListenerAdapter) container.getMessageListener();

Expand All @@ -158,9 +159,9 @@ public void testFullConfiguration(ApplicationContext context) {
}

/**
* Test for {@link CustomBean} and an manually endpoint registered
* Test for {@link CustomBean} and a manually registered endpoint
* with "myCustomEndpointId". The custom endpoint does not provide
* any factory so it's registered with the default one
* any factory, so it's registered with the default one
*/
public void testCustomConfiguration(ApplicationContext context) {
RabbitListenerContainerTestFactory defaultFactory =
Expand All @@ -171,14 +172,15 @@ public void testCustomConfiguration(ApplicationContext context) {
assertThat(customFactory.getListenerContainers()).hasSize(1);
RabbitListenerEndpoint endpoint = defaultFactory.getListenerContainers().get(0).getEndpoint();
assertThat(endpoint.getClass()).as("Wrong endpoint type").isEqualTo(SimpleRabbitListenerEndpoint.class);
assertThat(((SimpleRabbitListenerEndpoint) endpoint).getMessageListener()).as("Wrong listener set in custom endpoint").isEqualTo(context.getBean("simpleMessageListener"));
assertThat(((SimpleRabbitListenerEndpoint) endpoint).getMessageListener())
.as("Wrong listener set in custom endpoint").isEqualTo(context.getBean("simpleMessageListener"));

RabbitListenerEndpointRegistry customRegistry =
context.getBean("customRegistry", RabbitListenerEndpointRegistry.class);
assertThat(customRegistry.getListenerContainerIds().size()).as("Wrong number of containers in the registry").isEqualTo(2);
assertThat(customRegistry.getListenerContainers().size()).as("Wrong number of containers in the registry").isEqualTo(2);
assertThat(customRegistry.getListenerContainer("listenerId")).as("Container with custom id on the annotation should be found").isNotNull();
assertThat(customRegistry.getListenerContainer("myCustomEndpointId")).as("Container created with custom id should be found").isNotNull();
assertThat(customRegistry.getListenerContainerIds()).hasSize(2);
assertThat(customRegistry.getListenerContainers()).hasSize(2);
assertThat(customRegistry.getListenerContainer("listenerId")).isNotNull();
assertThat(customRegistry.getListenerContainer("myCustomEndpointId")).isNotNull();
}

/**
Expand All @@ -205,7 +207,6 @@ public void testDefaultContainerFactoryConfiguration(ApplicationContext context)
/**
* Test for {@link ValidationBean} with a validator ({@link TestValidator}) specified
* in a custom {@link org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory}.
*
* The test should throw a {@link org.springframework.amqp.rabbit.support.ListenerExecutionFailedException}
*/
public void testRabbitHandlerMethodFactoryConfiguration(ApplicationContext context) throws Exception {
Expand All @@ -216,6 +217,7 @@ public void testRabbitHandlerMethodFactoryConfiguration(ApplicationContext conte
simpleFactory.getListenerContainers().get(0).getEndpoint();

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setReceiveTimeout(10);
endpoint.setupListenerContainer(container);
MessagingMessageListenerAdapter listener = (MessagingMessageListenerAdapter) container.getMessageListener();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2023 the original author or authors.
* Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,6 +41,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -248,6 +249,7 @@ public class EnableRabbitIntegrationTests extends NeedsManagementTests {
public static void setUp() {
System.setProperty(RabbitListenerAnnotationBeanPostProcessor.RABBIT_EMPTY_STRING_ARGUMENTS_PROPERTY,
"test-empty");
System.setProperty("spring.amqp.deserialization.trust.all", "true");
RabbitAvailableCondition.getBrokerRunning().removeExchanges("auto.exch.tx",
"auto.exch",
"auto.exch.fanout",
Expand Down Expand Up @@ -826,7 +828,7 @@ public void testMeta() throws Exception {
}

@Test
public void testHeadersExchange() throws Exception {
public void testHeadersExchange() {
assertThat(rabbitTemplate.convertSendAndReceive("auto.headers", "", "foo",
message -> {
message.getMessageProperties().getHeaders().put("foo", "bar");
Expand All @@ -845,7 +847,7 @@ public void deadLetterOnDefaultExchange() {
this.rabbitTemplate.convertAndSend("amqp656", "foo");
assertThat(this.rabbitTemplate.receiveAndConvert("amqp656dlq", 10000)).isEqualTo("foo");
try {
Map<String, Object> amqp656 = await().until(() -> queueInfo("amqp656"), q -> q != null);
Map<String, Object> amqp656 = await().until(() -> queueInfo("amqp656"), Objects::nonNull);
if (amqp656 != null) {
assertThat(arguments(amqp656).get("test-empty")).isEqualTo("");
assertThat(arguments(amqp656).get("test-null")).isEqualTo("undefined");
Expand Down Expand Up @@ -960,7 +962,7 @@ public void messagingMessageReturned() throws InterruptedException {
catch (@SuppressWarnings("unused") Exception e) {
return null;
}
}, tim -> tim != null);
}, Objects::nonNull);
assertThat(timer.count()).isEqualTo(1L);
}

Expand Down Expand Up @@ -1786,6 +1788,7 @@ public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
factory.setBatchListener(true);
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
factory.setReceiveTimeout(10L);
return factory;
}

Expand Down Expand Up @@ -1861,7 +1864,7 @@ public CountDownLatch errorHandlerLatch2() {

@Bean
public AtomicReference<Throwable> errorHandlerError() {
return new AtomicReference<Throwable>();
return new AtomicReference<>();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,6 +83,7 @@ void multipleSimpleMessageListeners() {
Assertions.assertThat(methodEndpoint.getMethod()).isNotNull();

SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setReceiveTimeout(10);
methodEndpoint.setupListenerContainer(listenerContainer);
Assertions.assertThat(listenerContainer.getMessageListener()).isNotNull();
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -86,6 +86,7 @@ public void simpleMessageListener() {
assertThat(methodEndpoint.getMethod()).isNotNull();

SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setReceiveTimeout(10);
methodEndpoint.setupListenerContainer(listenerContainer);
assertThat(listenerContainer.getMessageListener()).isNotNull();

Expand Down Expand Up @@ -114,6 +115,7 @@ public void simpleMessageListenerWithMixedAnnotations() {
assertThat(iterator.next()).isEqualTo("secondQueue");

SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setReceiveTimeout(10);
methodEndpoint.setupListenerContainer(listenerContainer);
assertThat(listenerContainer.getMessageListener()).isNotNull();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,11 +63,11 @@
*/
public class LocalizedQueueConnectionFactoryTests {

private final Map<String, Channel> channels = new HashMap<String, Channel>();
private final Map<String, Channel> channels = new HashMap<>();

private final Map<String, Consumer> consumers = new HashMap<String, Consumer>();
private final Map<String, Consumer> consumers = new HashMap<>();

private final Map<String, String> consumerTags = new HashMap<String, String>();
private final Map<String, String> consumerTags = new HashMap<>();

@Test
public void testFailOver() throws Exception {
Expand All @@ -83,7 +83,7 @@ public void testFailOver() throws Exception {
final AtomicBoolean firstServer = new AtomicBoolean(true);
final WebClient client1 = doCreateClient(adminUris[0], username, password, nodes[0]);
final WebClient client2 = doCreateClient(adminUris[1], username, password, nodes[1]);
final Map<String, ConnectionFactory> mockCFs = new HashMap<String, ConnectionFactory>();
final Map<String, ConnectionFactory> mockCFs = new HashMap<>();
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
mockCFs.put(rabbit1, mockCF(rabbit1, latch1));
Expand Down Expand Up @@ -116,6 +116,7 @@ public WebClient createClient(String username, String password) {
willAnswer(new CallsRealMethods()).given(logger).debug(anyString());
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(lqcf);
container.setReceiveTimeout(10);
container.setQueueNames("q");
container.afterPropertiesSet();
container.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -167,7 +167,7 @@ protected Object determineCurrentLookupKey() {
public void testAbstractRoutingConnectionFactoryWithListenerContainer() {
ConnectionFactory connectionFactory1 = mock(ConnectionFactory.class);
ConnectionFactory connectionFactory2 = mock(ConnectionFactory.class);
Map<Object, ConnectionFactory> factories = new HashMap<Object, ConnectionFactory>(2);
Map<Object, ConnectionFactory> factories = new HashMap<>(2);
factories.put("[baz]", connectionFactory1);
factories.put("[foo,bar]", connectionFactory2);
ConnectionFactory defaultConnectionFactory = mock(ConnectionFactory.class);
Expand All @@ -178,6 +178,7 @@ public void testAbstractRoutingConnectionFactoryWithListenerContainer() {
connectionFactory.setTargetConnectionFactories(factories);

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setReceiveTimeout(10);
container.setQueueNames("foo", "bar");
container.afterPropertiesSet();
container.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2022 the original author or authors.
* Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -236,14 +236,14 @@ public void testSimpleBatchTwoEqualBufferLimit() throws Exception {
@Test
void testDebatchSMLCSplit() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory);
container.setReceiveTimeout(100);
container.setReceiveTimeout(10);
testDebatchByContainer(container, false);
}

@Test
void testDebatchSMLC() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory);
container.setReceiveTimeout(100);
container.setReceiveTimeout(10);
testDebatchByContainer(container, true);
}

Expand Down Expand Up @@ -303,16 +303,16 @@ private void testDebatchByContainer(AbstractMessageListenerContainer container,

@Test
public void testDebatchByContainerPerformance() throws Exception {
final List<Message> received = new ArrayList<Message>();
final List<Message> received = new ArrayList<>();
int count = 100000;
final CountDownLatch latch = new CountDownLatch(count);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory);
container.setQueueNames(ROUTE);
container.setMessageListener((MessageListener) message -> {
container.setMessageListener(message -> {
received.add(message);
latch.countDown();
});
container.setReceiveTimeout(100);
container.setReceiveTimeout(10);
container.setPrefetchCount(1000);
container.setBatchSize(1000);
container.afterPropertiesSet();
Expand Down Expand Up @@ -344,8 +344,8 @@ public void testDebatchByContainerPerformance() throws Exception {
public void testDebatchByContainerBadMessageRejected() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory);
container.setQueueNames(ROUTE);
container.setMessageListener((MessageListener) message -> { });
container.setReceiveTimeout(100);
container.setMessageListener(message -> { });
container.setReceiveTimeout(10);
ConditionalRejectingErrorHandler errorHandler = new ConditionalRejectingErrorHandler();
container.setErrorHandler(errorHandler);
container.afterPropertiesSet();
Expand Down Expand Up @@ -632,15 +632,15 @@ private Message receive(BatchingRabbitTemplate template) throws InterruptedExcep

@Test
public void testCompressionWithContainer() throws Exception {
final List<Message> received = new ArrayList<Message>();
final List<Message> received = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(2);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory);
container.setQueueNames(ROUTE);
container.setMessageListener((MessageListener) message -> {
container.setMessageListener(message -> {
received.add(message);
latch.countDown();
});
container.setReceiveTimeout(100);
container.setReceiveTimeout(10);
container.setAfterReceivePostProcessors(new DelegatingDecompressingPostProcessor());
container.afterPropertiesSet();
container.start();
Expand Down
Loading

0 comments on commit ca51ca6

Please sign in to comment.