Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using AsyncRabbitTemplate. ConvertSendAndReceive return RabbitConverterFuture display Reply timed out, how to solve? #2478

Open
awen112 opened this issue Jul 5, 2023 · 9 comments · May be fixed by #2932
Milestone

Comments

@awen112
Copy link

awen112 commented Jul 5, 2023

Using AsyncRabbitTemplate. ConvertSendAndReceive return RabbitConverterFuture display Reply timed out, how to solve?

future.whenComplete((result, ex) -> {
if (ex == null) {
} else {
}
});

@artembilan
Copy link
Member

Not enough info.
Please, provide more code and stack trace.
Essentially, how to reproduce?

@awen112
Copy link
Author

awen112 commented Jul 5, 2023

信息不足。 请提供更多代码和堆栈跟踪。 本质上,如何重现?

org.springframework.amqp.core.AmqpReplyTimeoutException: Reply timed out
at org.springframework.amqp.rabbit.TimeoutTask.run(TimeoutTask.java:56)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)

@artembilan
Copy link
Member

Well, still not clear what you want from us.
The behavior is just expected:

this.future.completeExceptionally(
				new AmqpReplyTimeoutException("Reply timed out", this.future.getRequestMessage()));

So, you need to react to that exception respectively, but according to you description it is not clear what is your goal.

Probably your server side just don't send a reply back to you.
I don't think that public static final int DEFAULT_RECEIVE_TIMEOUT = 30000; is a problem here.

@garyrussell
Copy link
Contributor

Yes, it simply means that no reply was received within the timeout.

@garyrussell
Copy link
Contributor

Closing due to lack of response.

@garyrussell garyrussell closed this as not planned Won't fix, can't repro, duplicate, stale Aug 7, 2023
@BenEfrati
Copy link

BenEfrati commented Dec 15, 2024

If an exception is thrown by messageConverter.fromMessage(message) ,


it results in an AmqpReplyTimeoutException.
This happens because the directReplyToContainer in AsyncRabbitTemplate does not provide an option to set a custom ErrorHandler.

@artembilan
Copy link
Member

@BenEfrati ,
I believe that is a default behavior.
However you an use different constructor with custom error handler:

	/**
	 * Construct an instance using the provided arguments. The first queue the container
	 * is configured to listen to will be used as the reply queue. Replies will be
	 * routed using the default exchange with that queue name as the routing key.
	 * @param template a {@link RabbitTemplate}
	 * @param container a {@link AbstractMessageListenerContainer}.
	 */
	public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container) {

Same DirectReplyToMessageListenerContainer can be used here.

@BenEfrati
Copy link

@artembilan,
Tried that as well, it's not just the error handler, the rabbit future.exceptionally can't be called using the AsyncRabbitTemplate message listener

	/**
	 * Construct an instance using the provided arguments. The first queue the container
	 * is configured to listen to will be used as the reply queue. If 'replyAddress' is
	 * null, replies will be routed using the default exchange with that queue name as the
	 * routing key. Otherwise it should have the form exchange/routingKey and must
	 * cause messages to be routed to the reply queue.
	 * @param template a {@link RabbitTemplate}.
	 * @param container a {@link AbstractMessageListenerContainer}.
	 * @param replyAddress the reply address.
	 */
	public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container,
			String replyAddress) {
		Assert.notNull(template, "'template' cannot be null");
		Assert.notNull(container, "'container' cannot be null");
		this.template = template;
		this.container = container;
		this.container.setMessageListener(this); 
		this.directReplyToContainer = null;
		if (replyAddress == null) {
			this.replyAddress = container.getQueueNames()[0];
		}
		else {
			this.replyAddress = replyAddress;
		}
	}

...

@Override
	public void onMessage(Message message, Channel channel) {
		MessageProperties messageProperties = message.getMessageProperties();
		if (messageProperties != null) {
			String correlationId = messageProperties.getCorrelationId();
			if (StringUtils.hasText(correlationId)) {
				if (this.logger.isDebugEnabled()) {
					this.logger.debug("onMessage: " + message);
				}
				RabbitFuture<?> future = this.pending.remove(correlationId);
				if (future != null) {
					if (future instanceof RabbitConverterFuture) {
						MessageConverter messageConverter = this.template.getMessageConverter();
						RabbitConverterFuture<Object> rabbitFuture = (RabbitConverterFuture<Object>) future;
						Object converted = rabbitFuture.getReturnType() != null
								&& messageConverter instanceof SmartMessageConverter smart
								? smart.fromMessage(message,
								rabbitFuture.getReturnType())
								: messageConverter.fromMessage(message); //exception thrown here
						rabbitFuture.complete(converted);
					}
					else {
						((RabbitMessageFuture) future).complete(message);
					}
				}
				else {
					if (this.logger.isWarnEnabled()) {
						this.logger.warn("No pending reply - perhaps timed out: " + message);
					}
				}
			}
		}
	}

AbstractMessageListenerContainer.executeListenerAndHandleException doesn't call to rabbitFuture.completeExceptionally , leads to AmqpReplyTimeoutException

@artembilan
Copy link
Member

OK. So, you mean we have to add a try..catch into that onMessage() method and call rabbitFuture.completeExceptionally() from there?
Feel free to contribute the fix: https://github.com/spring-projects/spring-amqp/blob/main/CONTRIBUTING.adoc !

@artembilan artembilan added this to the 3.2.2 milestone Dec 17, 2024
@artembilan artembilan reopened this Dec 17, 2024
BenEfrati added a commit to BenEfrati/spring-amqp that referenced this issue Dec 18, 2024
…late

Previously, conversion errors in AsyncRabbitTemplate lead to
AmqpReplyTimeoutException
BenEfrati added a commit to BenEfrati/spring-amqp that referenced this issue Dec 18, 2024
…late

Previously, conversion errors in AsyncRabbitTemplate lead to
AmqpReplyTimeoutException
BenEfrati added a commit to BenEfrati/spring-amqp that referenced this issue Dec 18, 2024
BenEfrati added a commit to BenEfrati/spring-amqp that referenced this issue Dec 18, 2024
…late

Previously, conversion errors in AsyncRabbitTemplate lead to AmqpReplyTimeoutException
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants