Skip to content

Commit

Permalink
GH-2890: Fix MessagingMessageListenerAdapter for batch in Kotlin
Browse files Browse the repository at this point in the history
Fixes: #2890
Issue link: #2890

The Kotlin function with signature `receiveBatch(messages: List<Message>)`
produced a `WildCardType` for the generic of the `List` argument.

* Fix `MessagingMessageListenerAdapter` to use `TypeUtils.isAssignable()`
to determine if the `Type` has a part as expected type

# Conflicts:
#	spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java
#	spring-rabbit/src/test/kotlin/org/springframework/amqp/rabbit/annotation/EnableRabbitKotlinTests.kt
  • Loading branch information
artembilan committed Nov 11, 2024
1 parent ad20705 commit 9942e3c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Optional;

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
Expand All @@ -43,8 +45,7 @@
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

import com.rabbitmq.client.Channel;
import org.springframework.util.TypeUtils;

/**
* A {@link org.springframework.amqp.core.MessageListener MessageListener}
Expand Down Expand Up @@ -240,9 +241,9 @@ private void returnOrThrow(org.springframework.amqp.core.Message amqpMessage, Ch
Object payload = message == null ? null : message.getPayload();
try {
handleResult(new InvocationResult(new RemoteInvocationResult(throwableToReturn), null,
payload == null ? Object.class : this.handlerAdapter.getReturnTypeFor(payload),
this.handlerAdapter.getBean(),
payload == null ? null : this.handlerAdapter.getMethodFor(payload)),
payload == null ? Object.class : this.handlerAdapter.getReturnTypeFor(payload),
this.handlerAdapter.getBean(),
payload == null ? null : this.handlerAdapter.getMethodFor(payload)),
amqpMessage, channel, message);
}
catch (ReplyFailureException rfe) {
Expand Down Expand Up @@ -405,8 +406,8 @@ private Type determineInferredType() { // NOSONAR - complexity
boolean isPayload = methodParameter.hasParameterAnnotation(Payload.class);
if (isHeaderOrHeaders && isPayload && MessagingMessageListenerAdapter.this.logger.isWarnEnabled()) {
MessagingMessageListenerAdapter.this.logger.warn(this.method.getName()
+ ": Cannot annotate a parameter with both @Header and @Payload; "
+ "ignored for payload conversion");
+ ": Cannot annotate a parameter with both @Header and @Payload; "
+ "ignored for payload conversion");
}
if (isEligibleParameter(methodParameter) // NOSONAR
&& (!isHeaderOrHeaders || isPayload) && !(isHeaderOrHeaders && isPayload)) {
Expand All @@ -416,7 +417,7 @@ private Type determineInferredType() { // NOSONAR - complexity
if (this.isBatch && !this.isCollection) {
throw new IllegalStateException(
"Mis-configuration; a batch listener must consume a List<?> or "
+ "Collection<?> for method: " + this.method);
+ "Collection<?> for method: " + this.method);
}

}
Expand Down Expand Up @@ -467,15 +468,16 @@ private Type extractGenericParameterTypFromMethodParameter(MethodParameter metho
}
else if (this.isBatch
&& ((parameterizedType.getRawType().equals(List.class)
|| parameterizedType.getRawType().equals(Collection.class))
&& parameterizedType.getActualTypeArguments().length == 1)) {
|| parameterizedType.getRawType().equals(Collection.class))
&& parameterizedType.getActualTypeArguments().length == 1)) {

this.isCollection = true;
Type paramType = parameterizedType.getActualTypeArguments()[0];
boolean messageHasGeneric = paramType instanceof ParameterizedType pType
&& pType.getRawType().equals(Message.class);
this.isMessageList = paramType.equals(Message.class) || messageHasGeneric;
this.isAmqpMessageList = paramType.equals(org.springframework.amqp.core.Message.class);
this.isMessageList = TypeUtils.isAssignable(paramType, Message.class) || messageHasGeneric;
this.isAmqpMessageList =
TypeUtils.isAssignable(paramType, org.springframework.amqp.core.Message.class);
if (messageHasGeneric) {
genericParameterType = ((ParameterizedType) paramType).getActualTypeArguments()[0];
}
Expand All @@ -487,6 +489,7 @@ else if (this.isBatch
}
return genericParameterType;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class EnableRabbitKotlinTests {
assertThat(result).isEqualTo("TEST")
val listener = registry.getListenerContainer("single").messageListener
assertThat(TestUtils.getPropertyValue(listener, "messagingMessageConverter.inferredArgumentType").toString())
.isEqualTo("class java.lang.String")
.isEqualTo("class java.lang.String")
}

@Test
Expand All @@ -83,17 +83,17 @@ class EnableRabbitKotlinTests {
class Config {

@RabbitListener(id = "single", queues = ["kotlinQueue"])
suspend fun handle(@Suppress("UNUSED_PARAMETER") data: String) : String? {
suspend fun handle(@Suppress("UNUSED_PARAMETER") data: String): String? {
return data.uppercase()
}

@Bean
fun rabbitListenerContainerFactory(cf: CachingConnectionFactory) =
SimpleRabbitListenerContainerFactory().also {
it.setAcknowledgeMode(AcknowledgeMode.MANUAL)
it.setReceiveTimeout(10)
it.setConnectionFactory(cf)
}
SimpleRabbitListenerContainerFactory().also {
it.setAcknowledgeMode(AcknowledgeMode.MANUAL)
it.setReceiveTimeout(10)
it.setConnectionFactory(cf)
}

@Bean
fun cf() = CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().connectionFactory)
Expand Down

0 comments on commit 9942e3c

Please sign in to comment.