Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python Pubsub Subscriber client not pulling messages when filter is applied on subscription #1026

Open
ff-sdesai opened this issue Nov 24, 2023 · 12 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.

Comments

@ff-sdesai
Copy link

ff-sdesai commented Nov 24, 2023

Thanks for stopping by to let us know something could be better!

PLEASE READ: If you have a support contract with Google, please create an issue in the support console instead of filing on GitHub. This will ensure a timely response.
I am using Pubsub streaming pull subscription in my Python web application. When I am not applying any subscription filter, the subscriber client is able to successfully pull messages from the subscription. However, if a subscription filter is applied, the subscriber stops pulling messages.If I go manually to the specific subscription and click on 'Pull', I can see that there are messages in the subscription (which obviously matched the filter criteria and hence are present within subscription). But the client can not pull any of these messages. Do I need to do any additional configuration for the client? The code for my subscriber client is as follows:-

import os
    
    from google.cloud import pubsub_v1
    from app.services.subscription_service import save_bill_events
    from app.utils.constants import BILL_SUBSCRIPTION_GCP_PROJECT_ID, BILL_EVENT_SUBSCRIPTION_ID
    from app.utils.logging_tracing_manager import get_logger
    
    
  
    logger = get_logger(__file__)
    
    
    def callback(message: pubsub_v1.subscriber.message.Message) -> None:
        save_bill_events(message.data)
        message.ack()
    
    
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(os.environ.get(BILL_SUBSCRIPTION_GCP_PROJECT_ID),
                                                     BILL_EVENT_SUBSCRIPTION_ID)
   
    # Limit the subscriber to only have fixed number of  outstanding messages at a time.
    flow_control = pubsub_v1.types.FlowControl(max_messages=50)
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)
 
    
    
    async def poll_bill_subscription():
        
        with subscriber:
            try:
                # When `timeout` is not set, result() will block indefinitely,
                # unless an exception is encountered first.
               
                streaming_pull_future.result()
            except Exception as e:
                # Even in case of an exception, subscriber should keep listening
                logger.error(
                    f"An error occurred while pulling message from subscription {BILL_EVENT_SUBSCRIPTION_ID}",
                    exc_info=True)
                
                pass

The subscription filter applied on subscription is as follows-
attributes.tenant_id = "1" OR attributes.tenant_id="e8a63d46-35bf-5e1c-acec-5d2495b7ae59" OR attributes.tenant_id="2" OR attributes.tenant_id="3"

Environment details

  • OS type and version: Deployed in GCP Kubernetes
  • Python version: 3.10
  • pip version: pip 22.0.2
  • google-cloud-pubsub version: 2.18.4
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Nov 24, 2023
@ff-sdesai
Copy link
Author

Strange thing is it works intermittently. It worked 2 or 3 times and then it stopped working again with same tenant_id

@ff-sdesai
Copy link
Author

Update- It's not working again. I tried lowering the google-cloud-pubsub SDK version to 2.18.1 but no result.
Not sure how it worked few times on 2.18.4 and then stopped working again without any code change

@ff-sdesai
Copy link
Author

@pradn I think I found out the reason behind this behavior. We are creating a subscription first with filter and a subscriber immediately after that using SDK. When there is no filter, the subscriber gets created successfully and starts receiving messages.
However, when I specify a filter, it looks like the subscriber is not able to subscribe successfully although there is no error shown. It starts working fine after a redeployment/restart. Can you confirm if this is the case and is there any workaround to handle this?

@pradn
Copy link
Contributor

pradn commented Dec 13, 2023

However, when I specify a filter, it looks like the subscriber is not able to subscribe successfully although there is no error shown. It starts working fine after a redeployment/restart. Can you confirm if this is the case and is there any workaround to handle this?

Are you sure there are messages that satisfy the filter criteria? Subscription creation will work fine even if its filter matches no messages. When you pulled in Pantheon, are you sure it was with the subscription with the filter? Can you verify the messages have the attributes you expect?

There are no additional settings required on the subscriber client to pull from a subscription w/ filters.

@ff-sdesai
Copy link
Author

Yes. If I do not add a sleep after subscription creation (at least 1 sec), I can see the messages in the subscription but the subscriber client(most probably) does not pull those. Also if I add sleep, it works flawlessly.

@pradn
Copy link
Contributor

pradn commented Dec 14, 2023

I forget something even more basic. A subscription only receives messages that were published after the subscription was created. So, a subscription with a filter won't receive messages published before its creation (we don't go back and filter through previously published messages).

Maybe what's happening when you add a sleep is that the messages published in the 1 second are being delivered?

@ff-sdesai
Copy link
Author

I am aware about it. And as I have mentioned above, I can see the messages in Subscription if I click on 'pull' manually. Only the subscriber can never pull those messages if it is created right after the subscription having a filter. If I create a subscription without a filter, subscriber works without adding any delay

@pradn
Copy link
Contributor

pradn commented Dec 15, 2023

Can you please share code snippets for the two scenarios, with comments explaining what messages are pulled in each case?

@mohammadtapad
Copy link

Any updates on this? we're using pubsub operator in airflow, and it fails to fetch the messages.

@mukund-ananthu
Copy link
Contributor

Hi @mohammadtapad , could you please file a customer ticket, so that this issue maybe triaged accordingly. Thanks!

@mukund-ananthu
Copy link
Contributor

I am aware about it. And as I have mentioned above, I can see the messages in Subscription if I click on 'pull' manually. Only the subscriber can never pull those messages if it is created right after the subscription having a filter. If I create a subscription without a filter, subscriber works without adding any delay

Can you please share code snippets for the two scenarios, with comments explaining what messages are pulled in each case?

ff-sdesai, Do you have the exact steps to deterministically reproduce the issue?

@ff-sdesai
Copy link
Author

@mukund-ananthu Steps to reproduce are as follows-

  • Create a Pubsub subscription using Python Pubsub SDK. The subscription should also have a valid filter configured
  • Create a subscriber and start polling the messages

My experience was the subscriber did not get created when there was a filter applied in Subscription. However, after restarting the application, subscriber starts successfully. Looks like issue exists when we create a subscriber immediately after creating a Subscription with a filter

However, I will not be able to share any code as it will be against my organization policy

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.
Projects
None yet
Development

No branches or pull requests

4 participants