Skip to content

Commit

Permalink
spring-projectsGH-2478 Handle conversion exception in AsyncRabbitTemp…
Browse files Browse the repository at this point in the history
…late

Previously, conversion errors in AsyncRabbitTemplate lead to
AmqpReplyTimeoutException
  • Loading branch information
BenEfrati committed Dec 18, 2024
1 parent d7058bb commit 2e93718
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,6 +50,7 @@
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.SmartMessageConverter;
import org.springframework.amqp.utils.JavaUtils;
import org.springframework.beans.factory.BeanNameAware;
Expand Down Expand Up @@ -603,12 +604,16 @@ public void onMessage(Message message, Channel channel) {
if (future instanceof RabbitConverterFuture) {
MessageConverter messageConverter = this.template.getMessageConverter();
RabbitConverterFuture<Object> rabbitFuture = (RabbitConverterFuture<Object>) future;
Object converted = rabbitFuture.getReturnType() != null
try {
Object converted = rabbitFuture.getReturnType() != null
&& messageConverter instanceof SmartMessageConverter smart
? smart.fromMessage(message,
rabbitFuture.getReturnType())
: messageConverter.fromMessage(message);
rabbitFuture.complete(converted);
rabbitFuture.complete(converted);
} catch (MessageConversionException e) {
rabbitFuture.completeExceptionally(e);
}
}
else {
((RabbitMessageFuture) future).complete(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.postprocessor.GUnzipPostProcessor;
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
Expand Down Expand Up @@ -386,6 +387,34 @@ public void testStopCancelled() throws Exception {
assertThat(callback.result).isNull();
}

@Test
public void testConversionException() throws InterruptedException {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setChannelCacheSize(1);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new SimpleMessageConverter(){
@Override
public Object fromMessage(Message message) throws MessageConversionException {
throw new MessageConversionException("Failed to convert message");
}
});
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
asyncRabbitTemplate.start();

RabbitConverterFuture<String> replyFuture = asyncRabbitTemplate.convertSendAndReceive("conversionException");

CountDownLatch cdl = new CountDownLatch(1);
replyFuture.whenComplete((result, ex) -> {
cdl.countDown();
});
assertThat(cdl.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(replyFuture).isCompletedExceptionally();

asyncRabbitTemplate.stop();
connectionFactory.destroy();
}


@Test
void ctorCoverage() {
AsyncRabbitTemplate template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk");
Expand Down Expand Up @@ -603,7 +632,7 @@ public SimpleMessageListenerContainer remoteContainer(ConnectionFactory connecti
}
else if ("noReply".equals(message)) {
return null;
}
}
return message.toUpperCase();
});

Expand Down

0 comments on commit 2e93718

Please sign in to comment.