- * N.B. this is not an {@link AmqpException} because it is a a client exception, not a protocol or broker
+ * N.B. this is not an {@link AmqpException} because it is a client exception, not a protocol or broker
* problem.
*
*
diff --git a/spring-amqp/src/main/java/org/springframework/amqp/support/converter/MessagingMessageConverter.java b/spring-amqp/src/main/java/org/springframework/amqp/support/converter/MessagingMessageConverter.java
index 0c91aac106..5bc545086b 100644
--- a/spring-amqp/src/main/java/org/springframework/amqp/support/converter/MessagingMessageConverter.java
+++ b/spring-amqp/src/main/java/org/springframework/amqp/support/converter/MessagingMessageConverter.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2020 the original author or authors.
+ * Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,6 +40,7 @@
* is considered to be a request).
*
* @author Stephane Nicoll
+ * @author Ngoc Nhan
* @since 1.4
*/
public class MessagingMessageConverter implements MessageConverter, InitializingBean {
@@ -104,11 +105,10 @@ public void afterPropertiesSet() {
public org.springframework.amqp.core.Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException {
- if (!(object instanceof Message)) {
+ if (!(object instanceof Message> input)) {
throw new IllegalArgumentException("Could not convert [" + object + "] - only [" +
Message.class.getName() + "] is handled by this converter");
}
- Message> input = (Message>) object;
this.headerMapper.fromHeaders(input.getHeaders(), messageProperties);
org.springframework.amqp.core.Message amqpMessage = this.payloadConverter.toMessage(
input.getPayload(), messageProperties);
diff --git a/spring-amqp/src/main/java/org/springframework/amqp/support/converter/RemoteInvocationResult.java b/spring-amqp/src/main/java/org/springframework/amqp/support/converter/RemoteInvocationResult.java
index 7d08e1d139..6945162895 100644
--- a/spring-amqp/src/main/java/org/springframework/amqp/support/converter/RemoteInvocationResult.java
+++ b/spring-amqp/src/main/java/org/springframework/amqp/support/converter/RemoteInvocationResult.java
@@ -26,6 +26,7 @@
*
* @author Juergen Hoeller
* @author Gary Russell
+ * @author Ngoc Nhan
* @since 3.0
*/
public class RemoteInvocationResult implements Serializable {
@@ -142,16 +143,13 @@ public boolean hasInvocationTargetException() {
@Nullable
public Object recreate() throws Throwable {
if (this.exception != null) {
- Throwable exToThrow = this.exception;
- if (this.exception instanceof InvocationTargetException invocationTargetException) {
- exToThrow = invocationTargetException.getTargetException();
- }
+ Throwable exToThrow = this.exception instanceof InvocationTargetException invocationTargetException
+ ? invocationTargetException.getTargetException()
+ : this.exception;
RemoteInvocationUtils.fillInClientStackTraceIfPossible(exToThrow);
throw exToThrow;
}
- else {
- return this.value;
- }
+ return this.value;
}
}
diff --git a/spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/AbstractDecompressingPostProcessor.java b/spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/AbstractDecompressingPostProcessor.java
index 249f0e25e4..3c87829cff 100644
--- a/spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/AbstractDecompressingPostProcessor.java
+++ b/spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/AbstractDecompressingPostProcessor.java
@@ -38,6 +38,7 @@
* the final content encoding of the decompressed message.
*
* @author Gary Russell
+ * @author Ngoc Nhan
* @since 1.4.2
*/
public abstract class AbstractDecompressingPostProcessor implements MessagePostProcessor, Ordered {
@@ -115,9 +116,8 @@ public Message postProcessMessage(Message message) throws AmqpException {
throw new AmqpIOException(e);
}
}
- else {
- return message;
- }
+
+ return message;
}
/**
diff --git a/spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/DelegatingDecompressingPostProcessor.java b/spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/DelegatingDecompressingPostProcessor.java
index e3e247f943..0651734b8c 100644
--- a/spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/DelegatingDecompressingPostProcessor.java
+++ b/spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/DelegatingDecompressingPostProcessor.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2021 the original author or authors.
+ * Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,11 +30,12 @@
*
* @author Gary Russell
* @author David Diehl
+ * @author Ngoc Nhan
* @since 1.4.2
*/
public class DelegatingDecompressingPostProcessor implements MessagePostProcessor, Ordered {
- private final Map decompressors = new HashMap();
+ private final Map decompressors = new HashMap<>();
private int order;
@@ -97,22 +98,20 @@ public Message postProcessMessage(Message message) throws AmqpException {
if (encoding == null) {
return message;
}
- else {
- int delimAt = encoding.indexOf(':');
- if (delimAt < 0) {
- delimAt = encoding.indexOf(',');
- }
- if (delimAt > 0) {
- encoding = encoding.substring(0, delimAt);
- }
- MessagePostProcessor decompressor = this.decompressors.get(encoding);
- if (decompressor != null) {
- return decompressor.postProcessMessage(message);
- }
- else {
- return message;
- }
+
+ int delimAt = encoding.indexOf(':');
+ if (delimAt < 0) {
+ delimAt = encoding.indexOf(',');
+ }
+ if (delimAt > 0) {
+ encoding = encoding.substring(0, delimAt);
}
+ MessagePostProcessor decompressor = this.decompressors.get(encoding);
+ if (decompressor != null) {
+ return decompressor.postProcessMessage(message);
+ }
+
+ return message;
}
}
diff --git a/spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/MessagePostProcessorUtils.java b/spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/MessagePostProcessorUtils.java
index bf2ed742d8..2c7a8f8b05 100644
--- a/spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/MessagePostProcessorUtils.java
+++ b/spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/MessagePostProcessorUtils.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2019 the original author or authors.
+ * Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,15 +29,16 @@
* Utilities for message post processors.
*
* @author Gary Russell
+ * @author Ngoc Nhan
* @since 1.4.2
*
*/
public final class MessagePostProcessorUtils {
public static Collection sort(Collection processors) {
- List priorityOrdered = new ArrayList();
- List ordered = new ArrayList();
- List unOrdered = new ArrayList();
+ List priorityOrdered = new ArrayList<>();
+ List ordered = new ArrayList<>();
+ List unOrdered = new ArrayList<>();
for (MessagePostProcessor processor : processors) {
if (processor instanceof PriorityOrdered) {
priorityOrdered.add(processor);
@@ -49,7 +50,7 @@ else if (processor instanceof Ordered) {
unOrdered.add(processor);
}
}
- List sorted = new ArrayList();
+ List sorted = new ArrayList<>();
OrderComparator.sort(priorityOrdered);
sorted.addAll(priorityOrdered);
OrderComparator.sort(ordered);
diff --git a/spring-amqp/src/main/java/org/springframework/amqp/utils/MapBuilder.java b/spring-amqp/src/main/java/org/springframework/amqp/utils/MapBuilder.java
index 1fea0e2d65..e3afc18117 100644
--- a/spring-amqp/src/main/java/org/springframework/amqp/utils/MapBuilder.java
+++ b/spring-amqp/src/main/java/org/springframework/amqp/utils/MapBuilder.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 the original author or authors.
+ * Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,11 +26,12 @@
* @param the value type.
* @author Artem Bilan
* @author Gary Russell
+ * @author Ngoc Nhan
* @since 2.0
*/
public class MapBuilder, K, V> {
- private final Map map = new HashMap();
+ private final Map map = new HashMap<>();
public B put(K key, V value) {
this.map.put(key, value);
diff --git a/spring-amqp/src/main/java/org/springframework/amqp/utils/test/TestUtils.java b/spring-amqp/src/main/java/org/springframework/amqp/utils/test/TestUtils.java
index 5934aa6a6a..f8c802e558 100644
--- a/spring-amqp/src/main/java/org/springframework/amqp/utils/test/TestUtils.java
+++ b/spring-amqp/src/main/java/org/springframework/amqp/utils/test/TestUtils.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2013-2019 the original author or authors.
+ * Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
* @author Iwein Fuld
* @author Oleg Zhurakousky
* @author Gary Russell
+ * @author Ngoc Nhan
* @since 1.2
*/
public final class TestUtils {
@@ -47,13 +48,14 @@ public static Object getPropertyValue(Object root, String propertyPath) {
value = accessor.getPropertyValue(tokens[i]);
if (value != null) {
accessor = new DirectFieldAccessor(value);
+ continue;
}
- else if (i == tokens.length - 1) {
+
+ if (i == tokens.length - 1) {
return null;
}
- else {
- throw new IllegalArgumentException("intermediate property '" + tokens[i] + "' is null");
- }
+
+ throw new IllegalArgumentException("intermediate property '" + tokens[i] + "' is null");
}
return value;
}
diff --git a/spring-amqp/src/test/java/org/springframework/amqp/core/AddressTests.java b/spring-amqp/src/test/java/org/springframework/amqp/core/AddressTests.java
index feaea573b2..17c24e4130 100644
--- a/spring-amqp/src/test/java/org/springframework/amqp/core/AddressTests.java
+++ b/spring-amqp/src/test/java/org/springframework/amqp/core/AddressTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
* @author Mark Fisher
* @author Artem Bilan
* @author Gary Russell
+ * @author Ngoc Nhan
*/
public class AddressTests {
@@ -100,6 +101,9 @@ public void testDirectReplyTo() {
@Test
public void testEquals() {
assertThat(new Address("foo/bar")).isEqualTo(new Address("foo/bar"));
+ assertThat(new Address("foo", null)).isEqualTo(new Address("foo", null));
+ assertThat(new Address(null, "bar")).isEqualTo(new Address(null, "bar"));
+ assertThat(new Address(null, null)).isEqualTo(new Address(null, null));
}
}
diff --git a/spring-amqp/src/test/java/org/springframework/amqp/support/converter/Jackson2JsonMessageConverterTests.java b/spring-amqp/src/test/java/org/springframework/amqp/support/converter/Jackson2JsonMessageConverterTests.java
index 37e274d314..574fecc06f 100644
--- a/spring-amqp/src/test/java/org/springframework/amqp/support/converter/Jackson2JsonMessageConverterTests.java
+++ b/spring-amqp/src/test/java/org/springframework/amqp/support/converter/Jackson2JsonMessageConverterTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2022 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.data.web.JsonPath;
+import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.MimeTypeUtils;
@@ -54,6 +55,7 @@
* @author Artem Bilan
*/
@SpringJUnitConfig
+@DirtiesContext
public class Jackson2JsonMessageConverterTests {
public static final String TRUSTED_PACKAGE = Jackson2JsonMessageConverterTests.class.getPackage().getName();
diff --git a/spring-amqp/src/test/java/org/springframework/amqp/support/converter/Jackson2XmlMessageConverterTests.java b/spring-amqp/src/test/java/org/springframework/amqp/support/converter/Jackson2XmlMessageConverterTests.java
index d5e3825a0c..836ee45ca0 100644
--- a/spring-amqp/src/test/java/org/springframework/amqp/support/converter/Jackson2XmlMessageConverterTests.java
+++ b/spring-amqp/src/test/java/org/springframework/amqp/support/converter/Jackson2XmlMessageConverterTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2019 the original author or authors.
+ * Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import com.fasterxml.jackson.databind.ser.BeanSerializerFactory;
@@ -43,6 +44,7 @@
* @since 2.1
*/
@SpringJUnitConfig
+@DirtiesContext
public class Jackson2XmlMessageConverterTests {
public static final String TRUSTED_PACKAGE = Jackson2XmlMessageConverterTests.class.getPackage().getName();
diff --git a/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/LongRunning.java b/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/LongRunning.java
index f2e3a5d85c..9ea1221489 100644
--- a/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/LongRunning.java
+++ b/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/LongRunning.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2019 the original author or authors.
+ * Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,7 +40,7 @@
public @interface LongRunning {
/**
- * The name of the variable/property used to determine whether long runnning tests
+ * The name of the variable/property used to determine whether long running tests
* should run.
* @return the name of the variable/property.
*/
diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java
index 925565cb29..842ca8f071 100644
--- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java
+++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021-2023 the original author or authors.
+ * Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -56,6 +56,7 @@
*
* @author Gary Russell
* @author Christian Tzolov
+ * @author Ngoc Nhan
* @since 2.4
*
*/
@@ -251,7 +252,7 @@ public void afterPropertiesSet() {
public boolean isRunning() {
this.lock.lock();
try {
- return this.consumers.size() > 0;
+ return !this.consumers.isEmpty();
}
finally {
this.lock.unlock();
@@ -262,7 +263,7 @@ public boolean isRunning() {
public void start() {
this.lock.lock();
try {
- if (this.consumers.size() == 0) {
+ if (this.consumers.isEmpty()) {
this.consumerCustomizer.accept(getListenerId(), this.builder);
if (this.simpleStream) {
this.consumers.add(this.builder.build());
diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java
index 0751e7dbd3..a0bde53ad2 100644
--- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java
+++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021-2023 the original author or authors.
+ * Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -55,6 +55,7 @@
*
* @author Gary Russell
* @author Christian Tzolov
+ * @author Ngoc Nhan
* @since 2.4
*
*/
@@ -78,8 +79,6 @@ public class RabbitStreamTemplate implements RabbitStreamOperations, Application
private boolean streamConverterSet;
- private Producer producer;
-
private String beanName;
private ProducerCustomizer producerCustomizer = (name, builder) -> { };
@@ -89,10 +88,12 @@ public class RabbitStreamTemplate implements RabbitStreamOperations, Application
@Nullable
private RabbitStreamTemplateObservationConvention observationConvention;
- private volatile boolean observationRegistryObtained;
-
private ObservationRegistry observationRegistry;
+ private volatile Producer producer;
+
+ private volatile boolean observationRegistryObtained;
+
/**
* Construct an instance with the provided {@link Environment}.
* @param environment the environment.
@@ -107,29 +108,31 @@ public RabbitStreamTemplate(Environment environment, String streamName) {
private Producer createOrGetProducer() {
- this.lock.lock();
- try {
- if (this.producer == null) {
- ProducerBuilder builder = this.environment.producerBuilder();
- if (this.superStreamRouting == null) {
- builder.stream(this.streamName);
- }
- else {
- builder.superStream(this.streamName)
- .routing(this.superStreamRouting);
- }
- this.producerCustomizer.accept(this.beanName, builder);
- this.producer = builder.build();
- if (!this.streamConverterSet) {
- ((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier(
- () -> this.producer.messageBuilder());
+ if (this.producer == null) {
+ this.lock.lock();
+ try {
+ if (this.producer == null) {
+ ProducerBuilder builder = this.environment.producerBuilder();
+ if (this.superStreamRouting == null) {
+ builder.stream(this.streamName);
+ }
+ else {
+ builder.superStream(this.streamName)
+ .routing(this.superStreamRouting);
+ }
+ this.producerCustomizer.accept(this.beanName, builder);
+ this.producer = builder.build();
+ if (!this.streamConverterSet) {
+ ((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier(
+ () -> this.producer.messageBuilder());
+ }
}
}
- return this.producer;
- }
- finally {
- this.lock.unlock();
+ finally {
+ this.lock.unlock();
+ }
}
+ return this.producer;
}
@Override
@@ -305,24 +308,13 @@ private ConfirmationHandler handleConfirm(CompletableFuture future, Obs
}
else {
int code = confStatus.getCode();
- String errorMessage;
- switch (code) {
- case Constants.CODE_MESSAGE_ENQUEUEING_FAILED:
- errorMessage = "Message Enqueueing Failed";
- break;
- case Constants.CODE_PRODUCER_CLOSED:
- errorMessage = "Producer Closed";
- break;
- case Constants.CODE_PRODUCER_NOT_AVAILABLE:
- errorMessage = "Producer Not Available";
- break;
- case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT:
- errorMessage = "Publish Confirm Timeout";
- break;
- default:
- errorMessage = "Unknown code: " + code;
- break;
- }
+ String errorMessage = switch (code) {
+ case Constants.CODE_MESSAGE_ENQUEUEING_FAILED -> "Message Enqueueing Failed";
+ case Constants.CODE_PRODUCER_CLOSED -> "Producer Closed";
+ case Constants.CODE_PRODUCER_NOT_AVAILABLE -> "Producer Not Available";
+ case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT -> "Publish Confirm Timeout";
+ default -> "Unknown code: " + code;
+ };
StreamSendException ex = new StreamSendException(errorMessage, code);
observation.error(ex);
observation.stop();
@@ -339,15 +331,17 @@ private ConfirmationHandler handleConfirm(CompletableFuture future, Obs
*/
@Override
public void close() {
- this.lock.lock();
- try {
- if (this.producer != null) {
- this.producer.close();
- this.producer = null;
+ if (this.producer != null) {
+ this.lock.lock();
+ try {
+ if (this.producer != null) {
+ this.producer.close();
+ this.producer = null;
+ }
+ }
+ finally {
+ this.lock.unlock();
}
- }
- finally {
- this.lock.unlock();
}
}
diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/support/converter/DefaultStreamMessageConverter.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/support/converter/DefaultStreamMessageConverter.java
index dd2d765fd7..614f4a6e79 100644
--- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/support/converter/DefaultStreamMessageConverter.java
+++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/support/converter/DefaultStreamMessageConverter.java
@@ -41,6 +41,7 @@
* Default {@link StreamMessageConverter}.
*
* @author Gary Russell
+ * @author Ngoc Nhan
* @since 2.4
*
*/
@@ -105,7 +106,7 @@ public com.rabbitmq.stream.Message fromMessage(Message message) throws MessageCo
.acceptIfNotNull(mProps.getGroupSequence(), propsBuilder::groupSequence)
.acceptIfNotNull(mProps.getReplyToGroupId(), propsBuilder::replyToGroupId);
ApplicationPropertiesBuilder appPropsBuilder = builder.applicationProperties();
- if (mProps.getHeaders().size() > 0) {
+ if (!mProps.getHeaders().isEmpty()) {
mProps.getHeaders().forEach((key, val) -> {
mapProp(key, val, appPropsBuilder);
});
diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java
index b87daf8db8..75880c5e2d 100644
--- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java
+++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 the original author or authors.
+ * Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
/**
@@ -40,6 +41,7 @@
*
*/
@SpringJUnitConfig
+@DirtiesContext
public class SuperStreamProvisioningTests extends AbstractTestContainerTests {
@Test
diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamConcurrentSACTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamConcurrentSACTests.java
index c7d34dd83e..50e1ff1058 100644
--- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamConcurrentSACTests.java
+++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamConcurrentSACTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022-2023 the original author or authors.
+ * Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.config.SuperStream;
+import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import com.rabbitmq.stream.Environment;
@@ -49,6 +50,7 @@
*
*/
@SpringJUnitConfig
+@DirtiesContext
public class SuperStreamConcurrentSACTests extends AbstractTestContainerTests {
@Test
diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java
index 1daaca2d33..a474f6926e 100644
--- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java
+++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022-2023 the original author or authors.
+ * Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -45,6 +45,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.rabbit.stream.config.SuperStream;
+import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import com.rabbitmq.stream.Environment;
@@ -57,6 +58,7 @@
*
*/
@SpringJUnitConfig
+@DirtiesContext
public class SuperStreamSACTests extends AbstractTestContainerTests {
@Test
diff --git a/spring-rabbit-test/src/test/java/org/springframework/amqp/rabbit/test/context/SpringRabbitTestTests.java b/spring-rabbit-test/src/test/java/org/springframework/amqp/rabbit/test/context/SpringRabbitTestTests.java
index b4c2fc7e01..697e984a0f 100644
--- a/spring-rabbit-test/src/test/java/org/springframework/amqp/rabbit/test/context/SpringRabbitTestTests.java
+++ b/spring-rabbit-test/src/test/java/org/springframework/amqp/rabbit/test/context/SpringRabbitTestTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 the original author or authors.
+ * Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
/**
@@ -39,6 +40,7 @@
@RabbitAvailable
@SpringJUnitConfig
@SpringRabbitTest
+@DirtiesContext
public class SpringRabbitTestTests {
@Autowired
diff --git a/spring-rabbit-test/src/test/java/org/springframework/amqp/rabbit/test/examples/TestRabbitTemplateTests.java b/spring-rabbit-test/src/test/java/org/springframework/amqp/rabbit/test/examples/TestRabbitTemplateTests.java
index 4d8c69586b..6ee389b821 100644
--- a/spring-rabbit-test/src/test/java/org/springframework/amqp/rabbit/test/examples/TestRabbitTemplateTests.java
+++ b/spring-rabbit-test/src/test/java/org/springframework/amqp/rabbit/test/examples/TestRabbitTemplateTests.java
@@ -39,6 +39,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import com.rabbitmq.client.AMQP;
@@ -53,6 +54,7 @@
*
*/
@SpringJUnitConfig
+@DirtiesContext
public class TestRabbitTemplateTests {
@Autowired
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java
index 5d6753f0b9..b8a63057c2 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java
@@ -89,6 +89,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author FengYang Su
+ * @author Ngoc Nhan
*
* @since 1.6
*/
@@ -483,7 +484,7 @@ public RabbitConverterFuture convertSendAndReceiveAsType(String exchange,
private RabbitConverterFuture convertSendAndReceive(String exchange, String routingKey, Object object,
MessagePostProcessor messagePostProcessor, ParameterizedTypeReference responseType) {
- AsyncCorrelationData correlationData = new AsyncCorrelationData(messagePostProcessor, responseType,
+ AsyncCorrelationData correlationData = new AsyncCorrelationData<>(messagePostProcessor, responseType,
this.enableConfirms);
if (this.container != null) {
this.template.convertAndSend(exchange, routingKey, object, this.messagePostProcessor, correlationData);
@@ -736,7 +737,7 @@ public Message postProcessMessage(Message message, Correlation correlation) thro
messageToSend = correlationData.userPostProcessor.postProcessMessage(message);
}
String correlationId = getOrSetCorrelationIdAndSetReplyTo(messageToSend, correlationData);
- correlationData.future = new RabbitConverterFuture(correlationId, message,
+ correlationData.future = new RabbitConverterFuture<>(correlationId, message,
AsyncRabbitTemplate.this::canceler, AsyncRabbitTemplate.this::timeoutTask);
if (correlationData.enableConfirms) {
correlationData.setId(correlationId);
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/EnableRabbit.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/EnableRabbit.java
index b22cc737d9..a83d043419 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/EnableRabbit.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/EnableRabbit.java
@@ -104,7 +104,7 @@
*
Annotated methods can use flexible signature; in particular, it is possible to use
* the {@link org.springframework.messaging.Message Message} abstraction and related annotations,
* see {@link RabbitListener} Javadoc for more details. For instance, the following would
- * inject the content of the message and a a custom "myCounter" AMQP header:
+ * inject the content of the message and a custom "myCounter" AMQP header:
*
*
* @RabbitListener(containerFactory = "myRabbitListenerContainerFactory", queues = "myQueue")
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/MultiRabbitListenerAnnotationBeanPostProcessor.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/MultiRabbitListenerAnnotationBeanPostProcessor.java
index 939bd6b583..0764691e1e 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/MultiRabbitListenerAnnotationBeanPostProcessor.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/MultiRabbitListenerAnnotationBeanPostProcessor.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2021 the original author or authors.
+ * Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,6 +24,8 @@
import org.springframework.amqp.core.Declarable;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.util.StringUtils;
/**
@@ -35,6 +37,7 @@
* configuration, preventing the server from automatic binding non-related structures.
*
* @author Wander Costa
+ * @author Ngoc Nhan
*
* @since 2.3
*/
@@ -70,14 +73,32 @@ private RabbitListener proxyIfAdminNotPresent(final RabbitListener rabbitListene
* @return The name of the RabbitAdmin bean.
*/
protected String resolveMultiRabbitAdminName(RabbitListener rabbitListener) {
- String admin = super.resolveExpressionAsString(rabbitListener.admin(), "admin");
- if (!StringUtils.hasText(admin) && StringUtils.hasText(rabbitListener.containerFactory())) {
- admin = rabbitListener.containerFactory() + RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX;
+
+ var admin = rabbitListener.admin();
+ if (StringUtils.hasText(admin)) {
+
+ var resolved = super.resolveExpression(admin);
+ if (resolved instanceof RabbitAdmin rabbitAdmin) {
+
+ return rabbitAdmin.getBeanName();
+ }
+
+ return super.resolveExpressionAsString(admin, "admin");
}
- if (!StringUtils.hasText(admin)) {
- admin = RabbitListenerConfigUtils.RABBIT_ADMIN_BEAN_NAME;
+
+ var containerFactory = rabbitListener.containerFactory();
+ if (StringUtils.hasText(containerFactory)) {
+
+ var resolved = super.resolveExpression(containerFactory);
+ if (resolved instanceof RabbitListenerContainerFactory> rlcf) {
+
+ return rlcf.getBeanName() + RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX;
+ }
+
+ return containerFactory + RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX;
}
- return admin;
+
+ return RabbitListenerConfigUtils.RABBIT_ADMIN_BEAN_NAME;
}
/**
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java
index ffdd652657..e22e1b3860 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2023 the original author or authors.
+ * Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -76,6 +76,7 @@
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationUtils;
+import org.springframework.core.annotation.MergedAnnotation;
import org.springframework.core.annotation.MergedAnnotations;
import org.springframework.core.annotation.MergedAnnotations.SearchStrategy;
import org.springframework.core.convert.ConversionService;
@@ -119,6 +120,7 @@
* @author Gary Russell
* @author Alex Panchenko
* @author Artem Bilan
+ * @author Ngoc Nhan
*
* @since 1.4
*
@@ -316,12 +318,12 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
private TypeMetadata buildMetadata(Class> targetClass) {
List classLevelListeners = findListenerAnnotations(targetClass);
- final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
+ final boolean hasClassLevelListeners = !classLevelListeners.isEmpty();
final List methods = new ArrayList<>();
final List multiMethods = new ArrayList<>();
ReflectionUtils.doWithMethods(targetClass, method -> {
List listenerAnnotations = findListenerAnnotations(method);
- if (listenerAnnotations.size() > 0) {
+ if (!listenerAnnotations.isEmpty()) {
methods.add(new ListenerMethod(method,
listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
}
@@ -356,14 +358,14 @@ else if (source instanceof Method method) {
}
return !name.contains("$MockitoMock$");
})
- .map(ann -> ann.synthesize())
+ .map(MergedAnnotation::synthesize)
.collect(Collectors.toList());
}
private void processMultiMethodListeners(RabbitListener[] classLevelListeners, Method[] multiMethods,
Object bean, String beanName) {
- List checkedMethods = new ArrayList();
+ List checkedMethods = new ArrayList<>();
Method defaultMethod = null;
for (Method method : multiMethods) {
Method checked = checkProxy(method, bean);
@@ -733,7 +735,7 @@ else if (resolvedValueToUse instanceof Iterable) {
}
private String[] registerBeansForDeclaration(RabbitListener rabbitListener, Collection declarables) {
- List queues = new ArrayList();
+ List queues = new ArrayList<>();
if (this.beanFactory instanceof ConfigurableBeanFactory) {
for (QueueBinding binding : rabbitListener.bindings()) {
String queueName = declareQueue(binding.value(), declarables);
@@ -879,7 +881,7 @@ private Map resolveArguments(Argument[] arguments) {
}
}
}
- return map.size() < 1 ? null : map;
+ return map.isEmpty() ? null : map;
}
private void addToMap(Map map, String key, Object value, Class> typeClass, String typeName) {
@@ -892,7 +894,7 @@ private void addToMap(Map map, String key, Object value, Class
}
}
else {
- if (value instanceof String && !StringUtils.hasText((String) value)) {
+ if (value instanceof String string && !StringUtils.hasText(string)) {
putEmpty(map, key);
}
else {
@@ -961,7 +963,7 @@ else if (resolved instanceof Integer) {
}
}
- private Object resolveExpression(String value) {
+ protected Object resolveExpression(String value) {
String resolvedValue = resolve(value);
return this.resolver.evaluate(resolvedValue, this.expressionContext);
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/batch/SimpleBatchingStrategy.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/batch/SimpleBatchingStrategy.java
index d1558f7a10..bd458351c5 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/batch/SimpleBatchingStrategy.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/batch/SimpleBatchingStrategy.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2022 the original author or authors.
+ * Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@
* length field.
*
* @author Gary Russell
+ * @author Ngoc Nhan
* @since 1.4.1
*
*/
@@ -50,7 +51,7 @@ public class SimpleBatchingStrategy implements BatchingStrategy {
private final long timeout;
- private final List messages = new ArrayList();
+ private final List messages = new ArrayList<>();
private String exchange;
@@ -87,7 +88,7 @@ public MessageBatch addToBatch(String exch, String routKey, Message message) {
}
int bufferUse = Integer.BYTES + message.getBody().length;
MessageBatch batch = null;
- if (this.messages.size() > 0 && this.currentSize + bufferUse > this.bufferLimit) {
+ if (!this.messages.isEmpty() && this.currentSize + bufferUse > this.bufferLimit) {
batch = doReleaseBatch();
this.exchange = exch;
this.routingKey = routKey;
@@ -103,16 +104,15 @@ public MessageBatch addToBatch(String exch, String routKey, Message message) {
@Override
public Date nextRelease() {
- if (this.messages.size() == 0 || this.timeout <= 0) {
+ if (this.messages.isEmpty() || this.timeout <= 0) {
return null;
}
- else if (this.currentSize >= this.bufferLimit) {
+ if (this.currentSize >= this.bufferLimit) {
// release immediately, we're already over the limit
return new Date();
}
- else {
- return new Date(System.currentTimeMillis() + this.timeout);
- }
+
+ return new Date(System.currentTimeMillis() + this.timeout);
}
@Override
@@ -121,13 +121,12 @@ public Collection releaseBatches() {
if (batch == null) {
return Collections.emptyList();
}
- else {
- return Collections.singletonList(batch);
- }
+
+ return Collections.singletonList(batch);
}
private MessageBatch doReleaseBatch() {
- if (this.messages.size() < 1) {
+ if (this.messages.isEmpty()) {
return null;
}
Message message = assembleMessage();
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractExchangeParser.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractExchangeParser.java
index 705fd84056..434e57d676 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractExchangeParser.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractExchangeParser.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@
* @author Gary Russell
* @author Felipe Gutierrez
* @author Artem Bilan
+ * @author Ngoc Nhan
*
*/
public abstract class AbstractExchangeParser extends AbstractSingleBeanDefinitionParser {
@@ -144,7 +145,7 @@ private void parseArguments(Element element, String argumentsElementName, Parser
Map, ?> map = parserContext.getDelegate().parseMapElement(argumentsElement,
builder.getRawBeanDefinition());
if (StringUtils.hasText(ref)) {
- if (map != null && map.size() > 0) {
+ if (map != null && !map.isEmpty()) {
parserContext.getReaderContext().error("You cannot have both a 'ref' and a nested map", element);
}
if (propertyName == null) {
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java
index f1ec63668a..bdbce98737 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2023 the original author or authors.
+ * Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -344,7 +344,7 @@ public void setObservationConvention(RabbitListenerObservationConvention observa
/**
* Set to true to stop the container after the current message(s) are processed and
* requeue any prefetched. Useful when using exclusive or single-active consumers.
- * @param forceStop true to stop when current messsage(s) are processed.
+ * @param forceStop true to stop when current message(s) are processed.
* @since 2.4.15
*/
public void setForceStop(boolean forceStop) {
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/BaseRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/BaseRabbitListenerContainerFactory.java
index 5a1fab024a..6363bbe18c 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/BaseRabbitListenerContainerFactory.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/BaseRabbitListenerContainerFactory.java
@@ -43,6 +43,7 @@
* @param the container type that the factory creates.
*
* @author Gary Russell
+ * @author Ngoc Nhan
* @since 2.4
*
*/
@@ -67,6 +68,8 @@ public abstract class BaseRabbitListenerContainerFactory map = new ManagedMap();
+ ManagedMap map = new ManagedMap<>();
map.put(new TypedStringValue(key), new TypedStringValue(value));
builder.addPropertyValue("arguments", map);
}
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java
index 0a070e1eca..b2bc26fb8a 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java
@@ -56,6 +56,7 @@
* @author Artem Bilan
* @author Johno Crawford
* @author Jeonggi Kim
+ * @author Ngoc Nhan
*
* @since 2.0
*
@@ -542,7 +543,7 @@ protected AbstractMessageListenerContainer createInstance() { // NOSONAR complex
.acceptIfNotNull(this.exclusiveConsumerExceptionLogger,
container::setExclusiveConsumerExceptionLogger)
.acceptIfNotNull(this.micrometerEnabled, container::setMicrometerEnabled)
- .acceptIfCondition(this.micrometerTags.size() > 0, this.micrometerTags,
+ .acceptIfCondition(!this.micrometerTags.isEmpty(), this.micrometerTags,
container::setMicrometerTags);
if (this.smlcCustomizer != null && this.type.equals(Type.simple)) {
this.smlcCustomizer.configure((SimpleMessageListenerContainer) container);
@@ -574,14 +575,13 @@ private AbstractMessageListenerContainer createContainer() {
.acceptIfNotNull(this.retryDeclarationInterval, container::setRetryDeclarationInterval);
return container;
}
- else {
- DirectMessageListenerContainer container = new DirectMessageListenerContainer(this.connectionFactory);
- JavaUtils.INSTANCE
- .acceptIfNotNull(this.consumersPerQueue, container::setConsumersPerQueue)
- .acceptIfNotNull(this.taskScheduler, container::setTaskScheduler)
- .acceptIfNotNull(this.monitorInterval, container::setMonitorInterval);
- return container;
- }
+
+ DirectMessageListenerContainer container = new DirectMessageListenerContainer(this.connectionFactory);
+ JavaUtils.INSTANCE
+ .acceptIfNotNull(this.consumersPerQueue, container::setConsumersPerQueue)
+ .acceptIfNotNull(this.taskScheduler, container::setTaskScheduler)
+ .acceptIfNotNull(this.monitorInterval, container::setMonitorInterval);
+ return container;
}
@Override
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java
index 0fb64b643c..71acf60981 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2021 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -42,6 +42,7 @@
/**
* @author Mark Fisher
* @author Gary Russell
+ * @author Ngoc Nhan
* @since 1.0
*/
class ListenerContainerParser implements BeanDefinitionParser {
@@ -100,8 +101,8 @@ public BeanDefinition parse(Element element, ParserContext parserContext) {
}
List childElements = DomUtils.getChildElementsByTagName(element, LISTENER_ELEMENT);
- for (int i = 0; i < childElements.size(); i++) {
- parseListener(childElements.get(i), element, parserContext, containerList);
+ for (Element childElement : childElements) {
+ parseListener(childElement, element, parserContext, containerList);
}
parserContext.popAndRegisterContainingComponent();
@@ -188,22 +189,22 @@ private void parseListener(Element listenerEle, Element containerEle, ParserCont
}
else {
String[] names = StringUtils.commaDelimitedListToStringArray(queues);
- List values = new ManagedList();
- for (int i = 0; i < names.length; i++) {
- values.add(new RuntimeBeanReference(names[i].trim()));
+ List values = new ManagedList<>();
+ for (String name : names) {
+ values.add(new RuntimeBeanReference(name.trim()));
}
containerDef.getPropertyValues().add("queues", values);
}
}
- ManagedMap args = new ManagedMap();
+ ManagedMap args = new ManagedMap<>();
String priority = listenerEle.getAttribute("priority");
if (StringUtils.hasText(priority)) {
args.put("x-priority", new TypedStringValue(priority, Integer.class));
}
- if (args.size() > 0) {
+ if (!args.isEmpty()) {
containerDef.getPropertyValues().add("consumerArguments", args);
}
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/NamespaceUtils.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/NamespaceUtils.java
index afdd5ada8a..5b99acef0f 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/NamespaceUtils.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/NamespaceUtils.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
* @author Mark Pollack
* @author Dave Syer
* @author Gary Russell
+ * @author Ngoc Nhan
*
*/
public abstract class NamespaceUtils {
@@ -125,7 +126,7 @@ public static boolean addConstructorArgValueIfAttributeDefined(BeanDefinitionBui
* @param builder the bean definition builder to be configured
* @param element the XML element where the attribute should be defined
* @param attributeName the name of the attribute whose value will be used as a constructor argument
- * @param defaultValue the default value to use if the attirbute is not set
+ * @param defaultValue the default value to use if the attribute is not set
*/
public static void addConstructorArgBooleanValueIfAttributeDefined(BeanDefinitionBuilder builder, Element element,
String attributeName, boolean defaultValue) {
@@ -249,7 +250,7 @@ public static void parseDeclarationControls(Element element, BeanDefinitionBuild
String admins = element.getAttribute("declared-by");
if (StringUtils.hasText(admins)) {
String[] adminBeanNames = admins.split(",");
- ManagedList adminBeanRefs = new ManagedList();
+ ManagedList adminBeanRefs = new ManagedList<>();
for (String adminBeanName : adminBeanNames) {
adminBeanRefs.add(new RuntimeBeanReference(adminBeanName.trim()));
}
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/QueueParser.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/QueueParser.java
index 4bce257e1a..38314dad06 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/QueueParser.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/QueueParser.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,6 +33,7 @@
* @author Gary Russell
* @author Felipe Gutierrez
* @author Artem Bilan
+ * @author Ngoc Nhan
*
*/
public class QueueParser extends AbstractSingleBeanDefinitionParser {
@@ -134,7 +135,7 @@ private void parseArguments(Element element, ParserContext parserContext, BeanDe
Map, ?> map = parserContext.getDelegate().parseMapElement(argumentsElement,
builder.getRawBeanDefinition());
if (StringUtils.hasText(ref)) {
- if (map != null && map.size() > 0) {
+ if (map != null && !map.isEmpty()) {
parserContext.getReaderContext()
.error("You cannot have both a 'ref' and a nested map", element);
}
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceUtils.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceUtils.java
index 69ba403406..c219c7bf7c 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceUtils.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceUtils.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2021 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -358,28 +358,22 @@ public static BeanDefinition parseContainer(Element containerEle, ParserContext
}
private static AcknowledgeMode parseAcknowledgeMode(Element ele, ParserContext parserContext) {
- AcknowledgeMode acknowledgeMode = null;
String acknowledge = ele.getAttribute(ACKNOWLEDGE_ATTRIBUTE);
if (StringUtils.hasText(acknowledge)) {
- if (ACKNOWLEDGE_AUTO.equals(acknowledge)) {
- acknowledgeMode = AcknowledgeMode.AUTO;
- }
- else if (ACKNOWLEDGE_MANUAL.equals(acknowledge)) {
- acknowledgeMode = AcknowledgeMode.MANUAL;
- }
- else if (ACKNOWLEDGE_NONE.equals(acknowledge)) {
- acknowledgeMode = AcknowledgeMode.NONE;
- }
- else {
- parserContext.getReaderContext().error(
+ return switch (acknowledge) {
+ case ACKNOWLEDGE_AUTO -> AcknowledgeMode.AUTO;
+ case ACKNOWLEDGE_MANUAL -> AcknowledgeMode.MANUAL;
+ case ACKNOWLEDGE_NONE -> AcknowledgeMode.NONE;
+ default -> {
+ parserContext.getReaderContext().error(
"Invalid listener container 'acknowledge' setting [" + acknowledge
- + "]: only \"auto\", \"manual\", and \"none\" supported.", ele);
- }
- return acknowledgeMode;
- }
- else {
- return null;
+ + "]: only \"auto\", \"manual\", and \"none\" supported.", ele);
+ yield null;
+ }
+ };
}
+
+ return null;
}
}
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java
index 3aa40f94ef..4e79caaa98 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2022 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,12 +27,14 @@
import org.springframework.amqp.rabbit.retry.MessageKeyGenerator;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.NewMessageIdentifier;
+import org.springframework.lang.Nullable;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.interceptor.MethodArgumentsKeyGenerator;
import org.springframework.retry.interceptor.MethodInvocationRecoverer;
import org.springframework.retry.interceptor.NewMethodArgumentsIdentifier;
import org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor;
import org.springframework.retry.support.RetryTemplate;
+import org.springframework.util.Assert;
/**
* Convenient factory bean for creating a stateful retry interceptor for use in a message listener container, giving you
@@ -47,6 +49,7 @@
*
* @author Dave Syer
* @author Gary Russell
+ * @author Ngoc Nhan
*
* @see RetryOperations#execute(org.springframework.retry.RetryCallback, org.springframework.retry.RecoveryCallback,
* org.springframework.retry.RetryState)
@@ -60,8 +63,8 @@ public class StatefulRetryOperationsInterceptorFactoryBean extends AbstractRetry
private NewMessageIdentifier newMessageIdentifier;
- public void setMessageKeyGenerator(MessageKeyGenerator messageKeyGeneretor) {
- this.messageKeyGenerator = messageKeyGeneretor;
+ public void setMessageKeyGenerator(MessageKeyGenerator messageKeyGenerator) {
+ this.messageKeyGenerator = messageKeyGenerator;
}
public void setNewMessageIdentifier(NewMessageIdentifier newMessageIdentifier) {
@@ -90,9 +93,8 @@ private NewMethodArgumentsIdentifier createNewItemIdentifier() {
if (StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier == null) {
return !message.getMessageProperties().isRedelivered();
}
- else {
- return StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier.isNew(message);
- }
+
+ return StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier.isNew(message);
};
}
@@ -120,6 +122,7 @@ else if (arg instanceof List && messageRecoverer instanceof MessageBatchRecovere
private MethodArgumentsKeyGenerator createKeyGenerator() {
return args -> {
Message message = argToMessage(args);
+ Assert.notNull(message, "The 'args' must not convert to null");
if (StatefulRetryOperationsInterceptorFactoryBean.this.messageKeyGenerator == null) {
String messageId = message.getMessageProperties().getMessageId();
if (messageId == null && message.getMessageProperties().isRedelivered()) {
@@ -127,23 +130,20 @@ private MethodArgumentsKeyGenerator createKeyGenerator() {
}
return messageId;
}
- else {
- return StatefulRetryOperationsInterceptorFactoryBean.this.messageKeyGenerator.getKey(message);
- }
+ return StatefulRetryOperationsInterceptorFactoryBean.this.messageKeyGenerator.getKey(message);
};
}
- @SuppressWarnings("unchecked")
+ @Nullable
private Message argToMessage(Object[] args) {
Object arg = args[1];
- Message message = null;
if (arg instanceof Message msg) {
- message = msg;
+ return msg;
}
- else if (arg instanceof List) {
- message = ((List) arg).get(0);
+ if (arg instanceof List> list) {
+ return (Message) list.get(0);
}
- return message;
+ return null;
}
@Override
@@ -151,9 +151,4 @@ public Class> getObjectType() {
return StatefulRetryOperationsInterceptor.class;
}
- @Override
- public boolean isSingleton() {
- return true;
- }
-
}
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/TemplateParser.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/TemplateParser.java
index 5259fa0e25..c9c5c3f908 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/TemplateParser.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/TemplateParser.java
@@ -34,6 +34,7 @@
* @author Dave Syer
* @author Gary Russell
* @author Artem Bilan
+ * @author Ngoc Nhan
*/
class TemplateParser extends AbstractSingleBeanDefinitionParser {
@@ -160,7 +161,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
BeanDefinition replyContainer = null;
Element childElement = null;
List childElements = DomUtils.getChildElementsByTagName(element, LISTENER_ELEMENT);
- if (childElements.size() > 0) {
+ if (!childElements.isEmpty()) {
childElement = childElements.get(0);
}
if (childElement != null) {
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java
index 33cb11dccf..b7ecb1a80d 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java
@@ -166,6 +166,7 @@ public void handleRecovery(Recoverable recoverable) {
@Nullable
private BackOff connectionCreatingBackOff;
+
/**
* Create a new AbstractConnectionFactory for the given target ConnectionFactory, with no publisher connection
* factory.
@@ -343,7 +344,16 @@ public int getPort() {
/**
* Set addresses for clustering. This property overrides the host+port properties if not empty.
- * @param addresses list of addresses with form "host[:port],..."
+ * @param addresses list of addresses in form {@code host[:port]}.
+ * @since 3.2.1
+ */
+ public void setAddresses(List addresses) {
+ Assert.notEmpty(addresses, "Addresses must not be empty");
+ setAddresses(String.join(",", addresses));
+ }
+ /**
+ * Set addresses for clustering. This property overrides the host+port properties if not empty.
+ * @param addresses list of addresses with form {@code host1[:port1],host2[:port2],...}.
*/
public void setAddresses(String addresses) {
this.lock.lock();
@@ -478,8 +488,9 @@ protected ExecutorService getExecutorService() {
}
/**
- * How long to wait (milliseconds) for a response to a connection close operation from the broker; default 30000 (30
- * seconds).
+ * How long to wait (milliseconds) for a response to a connection close operation from the broker;
+ * default 30000 (30 seconds).
+ * Also used for {@link com.rabbitmq.client.Channel#waitForConfirms()}.
* @param closeTimeout the closeTimeout to set.
*/
public void setCloseTimeout(int closeTimeout) {
@@ -580,8 +591,8 @@ public ConnectionFactory getPublisherConnectionFactory() {
protected final Connection createBareConnection() {
try {
String connectionName = this.connectionNameStrategy.obtainNewConnectionName(this);
-
com.rabbitmq.client.Connection rabbitConnection = connect(connectionName);
+ rabbitConnection.addShutdownListener(this);
Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout,
this.connectionCreatingBackOff == null ? null : this.connectionCreatingBackOff.start());
if (rabbitConnection instanceof AutorecoveringConnection auto) {
@@ -732,16 +743,8 @@ public String toString() {
}
}
- private static final class ConnectionBlockedListener implements BlockedListener {
-
- private final Connection connection;
-
- private final ApplicationEventPublisher applicationEventPublisher;
-
- ConnectionBlockedListener(Connection connection, ApplicationEventPublisher applicationEventPublisher) {
- this.connection = connection;
- this.applicationEventPublisher = applicationEventPublisher;
- }
+ private record ConnectionBlockedListener(Connection connection, ApplicationEventPublisher applicationEventPublisher)
+ implements BlockedListener {
@Override
public void handleBlocked(String reason) {
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java
index fdf057a34b..be70f77cc5 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2022 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,15 +36,16 @@
* @author Josh Chappelle
* @author Gary Russell
* @author Leonardo Ferreira
+ * @author Ngoc Nhan
* @since 1.3
*/
public abstract class AbstractRoutingConnectionFactory implements ConnectionFactory, RoutingConnectionFactory,
InitializingBean, DisposableBean {
private final Map targetConnectionFactories =
- new ConcurrentHashMap();
+ new ConcurrentHashMap<>();
- private final List connectionListeners = new ArrayList();
+ private final List connectionListeners = new ArrayList<>();
private ConnectionFactory defaultTargetConnectionFactory;
@@ -68,7 +69,7 @@ public void setTargetConnectionFactories(Map targetCo
Assert.noNullElements(targetConnectionFactories.values().toArray(),
"'targetConnectionFactories' cannot have null values.");
this.targetConnectionFactories.putAll(targetConnectionFactories);
- targetConnectionFactories.values().stream().forEach(cf -> checkConfirmsAndReturns(cf));
+ targetConnectionFactories.values().forEach(this::checkConfirmsAndReturns);
}
/**
@@ -292,7 +293,7 @@ public void destroy() {
@Override
public void resetConnection() {
- this.targetConnectionFactories.values().forEach(factory -> factory.resetConnection());
+ this.targetConnectionFactories.values().forEach(ConnectionFactory::resetConnection);
this.defaultTargetConnectionFactory.resetConnection();
}
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java
index f14aa989b9..302b18a5cd 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java
@@ -861,8 +861,6 @@ private void refreshProxyConnection(ChannelCachingConnectionProxy connection) {
*/
@Override
public final void destroy() {
- super.destroy();
- resetConnection();
if (getContextStopped()) {
this.stopped = true;
this.connectionLock.lock();
@@ -890,6 +888,8 @@ public final void destroy() {
this.connectionLock.unlock();
}
}
+ super.destroy();
+ resetConnection();
}
/**
@@ -1087,8 +1087,6 @@ public String toString() {
private final class CachedChannelInvocationHandler implements InvocationHandler {
- private static final int ASYNC_CLOSE_TIMEOUT = 5_000;
-
private final ChannelCachingConnectionProxy theConnection;
private final Deque channelList;
@@ -1302,7 +1300,7 @@ private void returnToCache(ChannelProxy proxy) {
getChannelsExecutor()
.execute(() -> {
try {
- publisherCallbackChannel.waitForConfirms(ASYNC_CLOSE_TIMEOUT);
+ publisherCallbackChannel.waitForConfirms(getCloseTimeout());
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
@@ -1426,10 +1424,10 @@ private void asyncClose() {
executorService.execute(() -> {
try {
if (ConfirmType.CORRELATED.equals(CachingConnectionFactory.this.confirmType)) {
- channel.waitForConfirmsOrDie(ASYNC_CLOSE_TIMEOUT);
+ channel.waitForConfirmsOrDie(getCloseTimeout());
}
else {
- Thread.sleep(ASYNC_CLOSE_TIMEOUT);
+ Thread.sleep(5_000); // NOSONAR - some time to give the channel a chance to ack
}
}
catch (@SuppressWarnings(UNUSED) InterruptedException e1) {
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CompositeChannelListener.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CompositeChannelListener.java
index 81c7dd532f..66399f31e4 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CompositeChannelListener.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CompositeChannelListener.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,11 +25,12 @@
/**
* @author Dave Syer
* @author Gary Russell
+ * @author Ngoc Nhan
*
*/
public class CompositeChannelListener implements ChannelListener {
- private List delegates = new ArrayList();
+ private List delegates = new ArrayList<>();
public void onCreate(Channel channel, boolean transactional) {
for (ChannelListener delegate : this.delegates) {
@@ -45,7 +46,7 @@ public void onShutDown(ShutdownSignalException signal) {
}
public void setDelegates(List extends ChannelListener> delegates) {
- this.delegates = new ArrayList(delegates);
+ this.delegates = new ArrayList<>(delegates);
}
public void addDelegate(ChannelListener delegate) {
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CompositeConnectionListener.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CompositeConnectionListener.java
index ce1b01a7b2..e8d9746446 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CompositeConnectionListener.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CompositeConnectionListener.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2021 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,15 +23,16 @@
import com.rabbitmq.client.ShutdownSignalException;
/**
- * A composite listener that invokes its delegages in turn.
+ * A composite listener that invokes its delegates in turn.
*
* @author Dave Syer
* @author Gary Russell
+ * @author Ngoc Nhan
*
*/
public class CompositeConnectionListener implements ConnectionListener {
- private List delegates = new CopyOnWriteArrayList();
+ private List delegates = new CopyOnWriteArrayList<>();
@Override
public void onCreate(Connection connection) {
@@ -54,7 +55,7 @@ public void onFailed(Exception exception) {
}
public void setDelegates(List extends ConnectionListener> delegates) {
- this.delegates = new ArrayList(delegates);
+ this.delegates = new ArrayList<>(delegates);
}
public void addDelegate(ConnectionListener delegate) {
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConsumerChannelRegistry.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConsumerChannelRegistry.java
index 0aac6947b3..e72c338b04 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConsumerChannelRegistry.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConsumerChannelRegistry.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2022 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
* tangle with RabbitResourceHolder.
*
* @author Gary Russell
+ * @author Ngoc Nhan
* @since 1.2
*
*/
@@ -39,7 +40,7 @@ public final class ConsumerChannelRegistry {
private static final Log logger = LogFactory.getLog(ConsumerChannelRegistry.class); // NOSONAR - lower case
private static final ThreadLocal consumerChannel // NOSONAR - lower case
- = new ThreadLocal();
+ = new ThreadLocal<>();
private ConsumerChannelRegistry() {
}
@@ -83,11 +84,9 @@ public static void unRegisterConsumerChannel() {
@Nullable
public static Channel getConsumerChannel() {
ChannelHolder channelHolder = consumerChannel.get();
- Channel channel = null;
- if (channelHolder != null) {
- channel = channelHolder.getChannel();
- }
- return channel;
+ return channelHolder != null
+ ? channelHolder.getChannel()
+ : null;
}
/**
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java
index e84ed71190..c27986322b 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2015-2023 the original author or authors.
+ * Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -52,6 +52,7 @@
*
* @author Gary Russell
* @author Christian Tzolov
+ * @author Ngoc Nhan
* @since 1.2
*/
public class LocalizedQueueConnectionFactory implements ConnectionFactory, RoutingConnectionFactory, DisposableBean,
@@ -61,13 +62,13 @@ public class LocalizedQueueConnectionFactory implements ConnectionFactory, Routi
private final Lock lock = new ReentrantLock();
- private final Map nodeFactories = new HashMap();
+ private final Map nodeFactories = new HashMap<>();
private final ConnectionFactory defaultConnectionFactory;
private final String[] adminUris;
- private final Map nodeToAddress = new HashMap();
+ private final Map nodeToAddress = new HashMap<>();
private final String vhost;
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PendingConfirm.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PendingConfirm.java
index 95c71fe744..5aa67760c9 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PendingConfirm.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PendingConfirm.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2020 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
* expired. It also holds {@link CorrelationData} for
* the client to correlate a confirm with a sent message.
* @author Gary Russell
+ * @author Ngoc Nhan
* @since 1.0.1
*
*/
@@ -115,7 +116,7 @@ public void setReturned(boolean isReturned) {
* @since 2.2.10
*/
public boolean waitForReturnIfNeeded() throws InterruptedException {
- return this.returned ? this.latch.await(RETURN_CALLBACK_TIMEOUT, TimeUnit.SECONDS) : true;
+ return !this.returned || this.latch.await(RETURN_CALLBACK_TIMEOUT, TimeUnit.SECONDS);
}
/**
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java
index a458be72ab..321c8925ab 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java
@@ -55,6 +55,7 @@
* @author Gary Russell
* @author Leonardo Ferreira
* @author Christian Tzolov
+ * @author Ngoc Nhan
* @since 2.3
*
*/
@@ -255,23 +256,21 @@ private Channel createProxy(Channel channel, boolean transacted) {
Advice advice =
(MethodInterceptor) invocation -> {
String method = invocation.getMethod().getName();
- switch (method) {
- case "close":
- handleClose(channel, transacted, proxy);
- return null;
- case "getTargetChannel":
- return channel;
- case "isTransactional":
- return transacted;
- case "confirmSelect":
- confirmSelected.set(true);
- return channel.confirmSelect();
- case "isConfirmSelected":
- return confirmSelected.get();
- case "isPublisherConfirms":
- return false;
- }
- return null;
+ return switch (method) {
+ case "close" -> {
+ handleClose(channel, transacted, proxy);
+ yield null;
+ }
+ case "getTargetChannel" -> channel;
+ case "isTransactional" -> transacted;
+ case "confirmSelect" -> {
+ confirmSelected.set(true);
+ yield channel.confirmSelect();
+ }
+ case "isConfirmSelected" -> confirmSelected.get();
+ case "isPublisherConfirms" -> false;
+ default -> null;
+ };
};
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(advice);
advisor.addMethodName("close");
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java
index c298c118a9..74fb85bc95 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2023 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -91,6 +91,7 @@
* @author Arnaud Cogoluègnes
* @author Artem Bilan
* @author Christian Tzolov
+ * @author Ngoc Nhan
*
* @since 1.0.1
*
@@ -903,12 +904,12 @@ public int getPendingConfirmsCount() {
@Override
public void addListener(Listener listener) {
Assert.notNull(listener, "Listener cannot be null");
- if (this.listeners.size() == 0) {
+ if (this.listeners.isEmpty()) {
this.delegate.addConfirmListener(this);
this.delegate.addReturnListener(this);
}
if (this.listeners.putIfAbsent(listener.getUUID(), listener) == null) {
- this.pendingConfirms.put(listener, new ConcurrentSkipListMap());
+ this.pendingConfirms.put(listener, new ConcurrentSkipListMap<>());
if (this.logger.isDebugEnabled()) {
this.logger.debug("Added listener " + listener);
}
@@ -921,27 +922,26 @@ public Collection expire(Listener listener, long cutoffTime) {
try {
SortedMap pendingConfirmsForListener = this.pendingConfirms.get(listener);
if (pendingConfirmsForListener == null) {
- return Collections.emptyList();
+ return Collections.emptyList();
}
- else {
- List expired = new ArrayList();
- Iterator> iterator = pendingConfirmsForListener.entrySet().iterator();
- while (iterator.hasNext()) {
- PendingConfirm pendingConfirm = iterator.next().getValue();
- if (pendingConfirm.getTimestamp() < cutoffTime) {
- expired.add(pendingConfirm);
- iterator.remove();
- CorrelationData correlationData = pendingConfirm.getCorrelationData();
- if (correlationData != null && StringUtils.hasText(correlationData.getId())) {
- this.pendingReturns.remove(correlationData.getId()); // NOSONAR never null
- }
- }
- else {
- break;
+
+ List expired = new ArrayList<>();
+ Iterator> iterator = pendingConfirmsForListener.entrySet().iterator();
+ while (iterator.hasNext()) {
+ PendingConfirm pendingConfirm = iterator.next().getValue();
+ if (pendingConfirm.getTimestamp() < cutoffTime) {
+ expired.add(pendingConfirm);
+ iterator.remove();
+ CorrelationData correlationData = pendingConfirm.getCorrelationData();
+ if (correlationData != null && StringUtils.hasText(correlationData.getId())) {
+ this.pendingReturns.remove(correlationData.getId()); // NOSONAR never null
}
}
- return expired;
+ else {
+ break;
+ }
}
+ return expired;
}
finally {
this.lock.unlock();
@@ -1025,7 +1025,7 @@ private void processMultipleAck(long seq, boolean ack) {
*/
Map involvedListeners = this.listenerForSeq.headMap(seq + 1);
// eliminate duplicates
- Set listenersForAcks = new HashSet(involvedListeners.values());
+ Set listenersForAcks = new HashSet<>(involvedListeners.values());
for (Listener involvedListener : listenersForAcks) {
// find all unack'd confirms for this listener and handle them
SortedMap confirmsMap = this.pendingConfirms.get(involvedListener);
@@ -1047,7 +1047,7 @@ private void processMultipleAck(long seq, boolean ack) {
}
}
}
- List seqs = new ArrayList(involvedListeners.keySet());
+ List seqs = new ArrayList<>(involvedListeners.keySet());
for (Long key : seqs) {
this.listenerForSeq.remove(key);
}
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitConnectionFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitConnectionFactoryBean.java
index d12361ee1d..944402231e 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitConnectionFactoryBean.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitConnectionFactoryBean.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2023 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -63,7 +63,7 @@
* optionally enabling SSL, with or without certificate validation. When
* {@link #setSslPropertiesLocation(Resource) sslPropertiesLocation} is not null, the
* default implementation loads a {@code PKCS12} keystore and a {@code JKS} truststore
- * using the supplied properties and intializes key and trust manager factories, using
+ * using the supplied properties and initializes key and trust manager factories, using
* algorithm {@code SunX509} by default. These are then used to initialize an
* {@link SSLContext} using the {@link #setSslAlgorithm(String) sslAlgorithm} (default
* TLSv1.2, falling back to TLSv1.1, if 1.2 is not available).
@@ -79,6 +79,7 @@
* @author Hareendran
* @author Dominique Villard
* @author Zachary DeLuca
+ * @author Ngoc Nhan
*
* @since 1.4
*/
@@ -360,12 +361,11 @@ protected String getKeyStoreType() {
if (this.keyStoreType == null && this.sslProperties.getProperty(KEY_STORE_TYPE) == null) {
return KEY_STORE_DEFAULT_TYPE;
}
- else if (this.keyStoreType != null) {
+ if (this.keyStoreType != null) {
return this.keyStoreType;
}
- else {
- return this.sslProperties.getProperty(KEY_STORE_TYPE);
- }
+
+ return this.sslProperties.getProperty(KEY_STORE_TYPE);
}
/**
@@ -389,12 +389,11 @@ protected String getTrustStoreType() {
if (this.trustStoreType == null && this.sslProperties.getProperty(TRUST_STORE_TYPE) == null) {
return TRUST_STORE_DEFAULT_TYPE;
}
- else if (this.trustStoreType != null) {
+ if (this.trustStoreType != null) {
return this.trustStoreType;
}
- else {
- return this.sslProperties.getProperty(TRUST_STORE_TYPE);
- }
+
+ return this.sslProperties.getProperty(TRUST_STORE_TYPE);
}
/**
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java
index 8cea93e04e..80fd02e75a 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2021 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -45,6 +45,7 @@
* @author Mark Fisher
* @author Dave Syer
* @author Gary Russell
+ * @author Ngoc Nhan
*
* @see org.springframework.amqp.rabbit.transaction.RabbitTransactionManager
* @see org.springframework.amqp.rabbit.core.RabbitTemplate
@@ -120,7 +121,7 @@ public final void addChannel(Channel channel, @Nullable Connection connection) {
if (connection != null) {
List channelsForConnection = this.channelsPerConnection.get(connection);
if (channelsForConnection == null) {
- channelsForConnection = new LinkedList();
+ channelsForConnection = new LinkedList<>();
this.channelsPerConnection.put(connection, channelsForConnection);
}
channelsForConnection.add(channel);
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java
index 4a958f6529..e135d365a2 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java
@@ -228,12 +228,7 @@ public static void setPhysicalCloseRequired(Channel channel, boolean b) {
*/
public static boolean isPhysicalCloseRequired() {
Boolean mustClose = physicalCloseRequired.get();
- if (mustClose == null) {
- return false;
- }
- else {
- return mustClose;
- }
+ return mustClose != null && mustClose;
}
/**
@@ -322,13 +317,12 @@ public static boolean isMismatchedQueueArgs(Exception e) {
if (sig == null) {
return false;
}
- else {
- Method shutdownReason = sig.getReason();
- return shutdownReason instanceof AMQP.Channel.Close closeReason
- && AMQP.PRECONDITION_FAILED == closeReason.getReplyCode()
- && closeReason.getClassId() == QUEUE_CLASS_ID_50
- && closeReason.getMethodId() == DECLARE_METHOD_ID_10;
- }
+
+ Method shutdownReason = sig.getReason();
+ return shutdownReason instanceof AMQP.Channel.Close closeReason
+ && AMQP.PRECONDITION_FAILED == closeReason.getReplyCode()
+ && closeReason.getClassId() == QUEUE_CLASS_ID_50
+ && closeReason.getMethodId() == DECLARE_METHOD_ID_10;
}
/**
@@ -352,13 +346,12 @@ public static boolean isExchangeDeclarationFailure(Exception e) {
if (sig == null) {
return false;
}
- else {
- Method shutdownReason = sig.getReason();
- return shutdownReason instanceof AMQP.Connection.Close closeReason
- && AMQP.COMMAND_INVALID == closeReason.getReplyCode()
- && closeReason.getClassId() == EXCHANGE_CLASS_ID_40
- && closeReason.getMethodId() == DECLARE_METHOD_ID_10;
- }
+
+ Method shutdownReason = sig.getReason();
+ return shutdownReason instanceof AMQP.Channel.Close closeReason
+ && AMQP.PRECONDITION_FAILED == closeReason.getReplyCode()
+ && closeReason.getClassId() == EXCHANGE_CLASS_ID_40
+ && closeReason.getMethodId() == DECLARE_METHOD_ID_10;
}
/**
@@ -395,18 +388,13 @@ public static int getMaxFrame(ConnectionFactory connectionFactory) {
public static SaslConfig stringToSaslConfig(String saslConfig,
com.rabbitmq.client.ConnectionFactory connectionFactory) {
- switch (saslConfig) {
- case "DefaultSaslConfig.PLAIN":
- return DefaultSaslConfig.PLAIN;
- case "DefaultSaslConfig.EXTERNAL":
- return DefaultSaslConfig.EXTERNAL;
- case "JDKSaslConfig":
- return new JDKSaslConfig(connectionFactory);
- case "CRDemoSaslConfig":
- return new CRDemoMechanism.CRDemoSaslConfig();
- default:
- throw new IllegalStateException("Unrecognized SaslConfig: " + saslConfig);
- }
+ return switch (saslConfig) {
+ case "DefaultSaslConfig.PLAIN" -> DefaultSaslConfig.PLAIN;
+ case "DefaultSaslConfig.EXTERNAL" -> DefaultSaslConfig.EXTERNAL;
+ case "JDKSaslConfig" -> new JDKSaslConfig(connectionFactory);
+ case "CRDemoSaslConfig" -> new CRDemoMechanism.CRDemoSaslConfig();
+ default -> throw new IllegalStateException("Unrecognized SaslConfig: " + saslConfig);
+ };
}
/**
diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleResourceHolder.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleResourceHolder.java
index bacdf4a61f..10036d3ad2 100644
--- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleResourceHolder.java
+++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleResourceHolder.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2019 the original author or authors.
+ * Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -45,6 +45,7 @@
*
* @author Artem Bilan
* @author Gary Russell
+ * @author Ngoc Nhan
* @since 1.3
*/
public final class SimpleResourceHolder {
@@ -56,10 +57,10 @@ public final class SimpleResourceHolder {
private static final Log LOGGER = LogFactory.getLog(SimpleResourceHolder.class);
private static final ThreadLocal