diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java index 077eacb7de..5d6753f0b9 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,6 +50,7 @@ import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.SmartMessageConverter; import org.springframework.amqp.utils.JavaUtils; import org.springframework.beans.factory.BeanNameAware; @@ -603,12 +604,16 @@ public void onMessage(Message message, Channel channel) { if (future instanceof RabbitConverterFuture) { MessageConverter messageConverter = this.template.getMessageConverter(); RabbitConverterFuture rabbitFuture = (RabbitConverterFuture) future; - Object converted = rabbitFuture.getReturnType() != null + try { + Object converted = rabbitFuture.getReturnType() != null && messageConverter instanceof SmartMessageConverter smart ? smart.fromMessage(message, rabbitFuture.getReturnType()) : messageConverter.fromMessage(message); - rabbitFuture.complete(converted); + rabbitFuture.complete(converted); + } catch (MessageConversionException e) { + rabbitFuture.completeExceptionally(e); + } } else { ((RabbitMessageFuture) future).complete(message); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java index 1c72c9db77..aadf18d773 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java @@ -54,6 +54,7 @@ import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener; +import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.amqp.support.postprocessor.GUnzipPostProcessor; import org.springframework.amqp.support.postprocessor.GZipPostProcessor; @@ -386,6 +387,34 @@ public void testStopCancelled() throws Exception { assertThat(callback.result).isNull(); } + @Test + public void testConversionException() throws InterruptedException { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); + connectionFactory.setChannelCacheSize(1); + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + rabbitTemplate.setMessageConverter(new SimpleMessageConverter(){ + @Override + public Object fromMessage(Message message) throws MessageConversionException { + throw new MessageConversionException("Failed to convert message"); + } + }); + AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate); + asyncRabbitTemplate.start(); + + RabbitConverterFuture replyFuture = asyncRabbitTemplate.convertSendAndReceive("conversionException"); + + CountDownLatch cdl = new CountDownLatch(1); + replyFuture.whenComplete((result, ex) -> { + cdl.countDown(); + }); + assertThat(cdl.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(replyFuture).isCompletedExceptionally(); + + asyncRabbitTemplate.stop(); + connectionFactory.destroy(); + } + + @Test void ctorCoverage() { AsyncRabbitTemplate template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk"); @@ -603,7 +632,7 @@ public SimpleMessageListenerContainer remoteContainer(ConnectionFactory connecti } else if ("noReply".equals(message)) { return null; - } + } return message.toUpperCase(); });