From 9942e3c21c7241695cc7a1b8c25e7284ff6d9d94 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 11 Nov 2024 12:31:35 -0500 Subject: [PATCH] GH-2890: Fix `MessagingMessageListenerAdapter` for batch in Kotlin Fixes: #2890 Issue link: https://github.com/spring-projects/spring-amqp/issues/2890 The Kotlin function with signature `receiveBatch(messages: List)` produced a `WildCardType` for the generic of the `List` argument. * Fix `MessagingMessageListenerAdapter` to use `TypeUtils.isAssignable()` to determine if the `Type` has a part as expected type # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java # spring-rabbit/src/test/kotlin/org/springframework/amqp/rabbit/annotation/EnableRabbitKotlinTests.kt --- .../MessagingMessageListenerAdapter.java | 27 ++++++++++--------- .../annotation/EnableRabbitKotlinTests.kt | 14 +++++----- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java index 6e3579f8c3..465594de0a 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Optional; +import com.rabbitmq.client.Channel; + import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler; import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; @@ -43,8 +45,7 @@ import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; - -import com.rabbitmq.client.Channel; +import org.springframework.util.TypeUtils; /** * A {@link org.springframework.amqp.core.MessageListener MessageListener} @@ -240,9 +241,9 @@ private void returnOrThrow(org.springframework.amqp.core.Message amqpMessage, Ch Object payload = message == null ? null : message.getPayload(); try { handleResult(new InvocationResult(new RemoteInvocationResult(throwableToReturn), null, - payload == null ? Object.class : this.handlerAdapter.getReturnTypeFor(payload), - this.handlerAdapter.getBean(), - payload == null ? null : this.handlerAdapter.getMethodFor(payload)), + payload == null ? Object.class : this.handlerAdapter.getReturnTypeFor(payload), + this.handlerAdapter.getBean(), + payload == null ? null : this.handlerAdapter.getMethodFor(payload)), amqpMessage, channel, message); } catch (ReplyFailureException rfe) { @@ -405,8 +406,8 @@ private Type determineInferredType() { // NOSONAR - complexity boolean isPayload = methodParameter.hasParameterAnnotation(Payload.class); if (isHeaderOrHeaders && isPayload && MessagingMessageListenerAdapter.this.logger.isWarnEnabled()) { MessagingMessageListenerAdapter.this.logger.warn(this.method.getName() - + ": Cannot annotate a parameter with both @Header and @Payload; " - + "ignored for payload conversion"); + + ": Cannot annotate a parameter with both @Header and @Payload; " + + "ignored for payload conversion"); } if (isEligibleParameter(methodParameter) // NOSONAR && (!isHeaderOrHeaders || isPayload) && !(isHeaderOrHeaders && isPayload)) { @@ -416,7 +417,7 @@ private Type determineInferredType() { // NOSONAR - complexity if (this.isBatch && !this.isCollection) { throw new IllegalStateException( "Mis-configuration; a batch listener must consume a List or " - + "Collection for method: " + this.method); + + "Collection for method: " + this.method); } } @@ -467,15 +468,16 @@ private Type extractGenericParameterTypFromMethodParameter(MethodParameter metho } else if (this.isBatch && ((parameterizedType.getRawType().equals(List.class) - || parameterizedType.getRawType().equals(Collection.class)) - && parameterizedType.getActualTypeArguments().length == 1)) { + || parameterizedType.getRawType().equals(Collection.class)) + && parameterizedType.getActualTypeArguments().length == 1)) { this.isCollection = true; Type paramType = parameterizedType.getActualTypeArguments()[0]; boolean messageHasGeneric = paramType instanceof ParameterizedType pType && pType.getRawType().equals(Message.class); - this.isMessageList = paramType.equals(Message.class) || messageHasGeneric; - this.isAmqpMessageList = paramType.equals(org.springframework.amqp.core.Message.class); + this.isMessageList = TypeUtils.isAssignable(paramType, Message.class) || messageHasGeneric; + this.isAmqpMessageList = + TypeUtils.isAssignable(paramType, org.springframework.amqp.core.Message.class); if (messageHasGeneric) { genericParameterType = ((ParameterizedType) paramType).getActualTypeArguments()[0]; } @@ -487,6 +489,7 @@ else if (this.isBatch } return genericParameterType; } + } } diff --git a/spring-rabbit/src/test/kotlin/org/springframework/amqp/rabbit/annotation/EnableRabbitKotlinTests.kt b/spring-rabbit/src/test/kotlin/org/springframework/amqp/rabbit/annotation/EnableRabbitKotlinTests.kt index 397fe765e8..c49a432181 100644 --- a/spring-rabbit/src/test/kotlin/org/springframework/amqp/rabbit/annotation/EnableRabbitKotlinTests.kt +++ b/spring-rabbit/src/test/kotlin/org/springframework/amqp/rabbit/annotation/EnableRabbitKotlinTests.kt @@ -66,7 +66,7 @@ class EnableRabbitKotlinTests { assertThat(result).isEqualTo("TEST") val listener = registry.getListenerContainer("single").messageListener assertThat(TestUtils.getPropertyValue(listener, "messagingMessageConverter.inferredArgumentType").toString()) - .isEqualTo("class java.lang.String") + .isEqualTo("class java.lang.String") } @Test @@ -83,17 +83,17 @@ class EnableRabbitKotlinTests { class Config { @RabbitListener(id = "single", queues = ["kotlinQueue"]) - suspend fun handle(@Suppress("UNUSED_PARAMETER") data: String) : String? { + suspend fun handle(@Suppress("UNUSED_PARAMETER") data: String): String? { return data.uppercase() } @Bean fun rabbitListenerContainerFactory(cf: CachingConnectionFactory) = - SimpleRabbitListenerContainerFactory().also { - it.setAcknowledgeMode(AcknowledgeMode.MANUAL) - it.setReceiveTimeout(10) - it.setConnectionFactory(cf) - } + SimpleRabbitListenerContainerFactory().also { + it.setAcknowledgeMode(AcknowledgeMode.MANUAL) + it.setReceiveTimeout(10) + it.setConnectionFactory(cf) + } @Bean fun cf() = CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().connectionFactory)