Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish hangs with 'publisher confirms' enabled #149

Open
greatvovan opened this issue Jul 20, 2017 · 2 comments
Open

Publish hangs with 'publisher confirms' enabled #149

greatvovan opened this issue Jul 20, 2017 · 2 comments

Comments

@greatvovan
Copy link

greatvovan commented Jul 20, 2017

My service reads messages from one queue and outputs processed messages to another exchange.
I want to ack incoming message only after successful publishing of outgoing message. It worked well untill I met some issues with lost messages and I decided to enable 'publisher confirms' feature. After this my program started to hang forever on publish(). Here is the simple code to illustrate problem.

import asyncio
import aioamqp
from aioamqp.channel import Channel, Envelope


class RabbitClient:
    def __init__(self):
        self.consumer = None # type: Channel
        self.publisher = None  # type: Channel
        self.exchange_out = 'exchange_out'
        self.queue_out = 'queue_out'
        self.queue_in = 'queue_in'
        self.rabbit_url = 'amqp://rabbit'

    async def connect(self):
        # Connection
        _, protocol = await aioamqp.from_url(self.rabbit_url)
        self.consumer = await protocol.channel()
        self.publisher = await protocol.channel()

        # Enable publisher confirms
        await self.publisher.confirm_select()

        # Output
        await self.publisher.exchange(self.exchange_out, 'fanout', auto_delete=False,
                                      passive=False, durable=True)
        await self.publisher.queue(self.queue_out, durable=True, auto_delete=False)
        await self.publisher.queue_bind(exchange_name=self.exchange_out,
                                        queue_name=self.queue_out, routing_key='')

        # Input
        await self.consumer.queue(self.queue_in, durable=True, auto_delete=False)
        await self.consumer.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
        print('Connected')

    async def publish_message(self, msg: str):
        await self.consumer.publish(msg, '', self.queue_in)  # works here
        print('Message published: ' + msg)

    async def start_consuming(self):
        await self.consumer.basic_consume(queue_name=self.queue_in, callback=self.on_message)
        print('Consumer started')

    async def on_message(self, channel: Channel, payload: bytes, envelope: Envelope, _):
        await self.publish_and_ack(payload, envelope.delivery_tag)

    async def publish_and_ack(self, payload: bytes, delivery_tag: str):
        try:
            s = payload.decode('utf-8')
            print(f"Got message: '{s}', publishing to out exchange...")
            await self.publisher.publish(s + ' processed', self.exchange_out, '')   # HANGS HERE!
            print('Sending ack')
            await self.consumer.basic_client_ack(delivery_tag)
            print('Done')
        except:
            await self.consumer.basic_client_nack(delivery_tag)

if __name__ == '__main__':
    client = RabbitClient()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(client.connect())
    loop.run_until_complete(client.publish_message('test1'))
    loop.run_until_complete(client.publish_message('test2'))
    loop.run_until_complete(client.start_consuming())
    loop.run_forever()

I tried to play with it and found the following:

  • It works if I comment await self.publisher.confirm_select().
  • It works if I start publish_and_ack() in task (e.g. asyncio.ensure_future()), but I dislike releasing the task to 'free float'.
  • It works if I use consumer or channel (which are the same) in on_message() instead of publisher.
  • I cannot use one channel for everywhere (only consumer or only publisher).
  • It also works if I call confirm_select() from separate channel (e.g. publisher) and use another channel everywhere.

It seems really weird. Am I doing everything correctly?
What is the best practice of using channels with this lib?

@mwfrojdman
Copy link
Contributor

There's an asyncio task running the dispatcher that reads messages from the server and calls handler methods; Channel.basic_deliver is called to deliver messages to your consumer callback (

yield from callback(self, body, envelope, properties)
). The problem is the callback (your on_message function) is executed in the same task as the dispatcher, so while the callback hasn't returned, no messages will be handled from the server.

Confirm select changes the messaging with the server so for the basic.public messages you send (by calling Channel.publish), the server replies with an ack message. Publish does not return until it has received that ack. So what happens is that Channel.dispatch_frame is calling Channel.basic_deliver, which is stuck waiting for on_message to return, but on_message is calling publish_and_ack which waits for and ack for the server, but Channel.dispatch_frame is stuck and will cannot read the next message.

If I'm reading the code correctly, having another channel should not help if confirm_select() is used, because the connection's main dispatcher just delegates to the channels, and doesn't process the next message from the server until the channel method returns. The publisher confirms are channel-specific, so it might be that it works because there's not actually any confirms used on the channel.

One workaround is to call loop.create_task for publish_and_ack(), which allows the library's dispatcher to process the next messages. But then you have to clean up those tasks once they finish somewhere else to avoid warnings.

@greatvovan
Copy link
Author

If I'm reading the code correctly, having another channel should not help if confirm_select() is used, because the connection's main dispatcher just delegates to the channels

But it does help. Tried to set up yet another (third) channel, it works with it too. To sum up:

  • consumer channel – works,
  • publisher channel – does not work,
  • yet_another_channel – works.

Looks like it does not works only in that channel which was used to call confirm_select().

One workaround is to call loop.create_task for publish_and_ack()

Yes, I mentioned this at second point in the initial post. I don't like it and at the same time another workaround (enabling confirms in a separate channel) work pretty well. Are there any pros to use your workaround, namely?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants