Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tries to dispatch message even when failed to bind #3028

Open
guvenozgur opened this issue Nov 5, 2024 · 1 comment
Open

Tries to dispatch message even when failed to bind #3028

guvenozgur opened this issue Nov 5, 2024 · 1 comment

Comments

@guvenozgur
Copy link

guvenozgur commented Nov 5, 2024

I call streamBridge.send method inside a block that utilizes CompletableFuture.supplyAsync to interrupt execution when it takes longer time than expected.
At the first call after application started, messageChannel is not found inside the cache, so the producer tries to bind topic. Binding takes a bit long time and the thread is completed by the CompletableFuture.
Here is the flow after the time exceeds for the supplier run by CompletableFuture.supplyAsync

  1. InterruptedException is thrown inside the thread that tries to bind
  2. It is caught by AbstractMessageChannelBinder.doBindProducer and then BinderException exception is thrown
  3. BinderException is caught by BindingService.doBindProducer and then LateBinding object is returned
  4. Even though binding is not completed, message is tried to be sent in StreamBridge.send. Because resolveDestination method returns without error.
  5. UnicastingDispatcher.getHandlerIterator does not find the handler for the message and throws MessageDispatchingException(message, "Dispatcher has no subscribers");

I expect streamBridge.send to throw InterruptedException but it throws MessageDispatchingException. I could not get why it is required to send message when binding is unsuccessful.

Here is a sample code to reproduce the issue

public String publishMessage(String message) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        TestMessage testMessage = TestMessage.builder().command(Command.ADD_ITEM).message(message).build();

        try {
            return CompletableFuture
                    .supplyAsync(() -> customStreamBridge.publishEvent(testMessage.toPartitionedMessage()), executorService)
                    .exceptionallyAsync(throwable -> {
                        return null;
                    }).completeOnTimeout(null, 10, TimeUnit.MILLISECONDS).join();
        } finally {
            log.info("!!!");
            executorService.shutdownNow();
        }

    }
    
public class CustomStreamBridge {
    private final StreamBridge streamBridge;
    public EventStreamBridge(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    public String publishEvent(Message<?> message) {
        try {
            streamBridge.send("test-out-0", message);
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                // retry with timeout
            } else {
                throw e;
            }
        }
        return "OK";
    }
}    
@sobychacko sobychacko transferred this issue from spring-attic/spring-cloud-stream-binder-kafka Nov 5, 2024
@olegz
Copy link
Contributor

olegz commented Dec 19, 2024

Have you actually tried to set StreamBridge to async mode such as streamBridge.setAsync(true) before sending a message? This would avoid all that code you are writing to be able to send message asynch.
Also, if you stil believe there is an issue, please compile a small reproducible sample and push it to GitHub somewhere or attach it as zip so we can take a look,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants