-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PubSub Publisher 504 Deadline Exceeded when sending bulk messages #72
Comments
@dillonjohnson Can the issue be reproduced consistently? Asking because I tried it a few times (from my local machine), and it worked fine, even though the publishing took quite some time. The only difference on my side could be the message size, I just sent generic short-ish strings instead of potentially long JSON rows. What's the typical row length in the data you used? Update: I also tried tried with a CSV containing some public data on real estate sales (typical stringified row length ~250 bytes), same result, no error occurred. |
@plamut Sorry for the delay. I should've given a better example. I do feel it is somewhat network bound to my local, but we will be sending a fair amount of messages so I don't want to outpace the network on K8s. For me locally, this below is crashing every time. from google.cloud import bigquery
from google.cloud import pubsub_v1
from time import sleep
import json
ack_count = 0
sent_count = 0
def get_callback(f):
def callback(f):
try:
global ack_count
ack_count += 1
except: # noqa
print("Please handle {} for {}.".format(f.exception()))
return callback
project_id = '' # TODO: insert here
topic_name = '' # TODO: insert here
bq = bigquery.Client()
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
rows = bq.query("select * from bigquery-public-data.samples.gsod limit 20000")
for row in rows:
sent_count += 1
future = publisher.publish(topic_path, json.dumps(dict(row)).encode('utf-8'))
future.add_done_callback(get_callback(future))
while sent_count < ack_count:
sleep(5) This will work using public data, but our average message is somewhere around 2K bytes long. If this doesn't produce the issue for you, you should be able to raise the limit and hit it. I think at least. Thanks for the quick response. |
@dillonjohnson Thanks for the updated sample, but I still wasn't able to reproduce the error even if I bumped the row limit to 200k (tried multiple times) - at least not without "cheating". I only had success if I briefly disabled my WiFi during the process, in which case I saw the 504 Deadline Exceeded errors. Would it be feasible for you to try running the script from a test K8s instance and see if the error is reproducible there as well? |
@plamut It all depends on the upload bandwidth on the client side. In the US, on coax, you could get only 5Mb/s up and more likely to hit that error. It is pretty much unlikely on/within GCP. As per https://cloud.google.com/pubsub/docs/troubleshooting#publish-deadline-exceeded, rate limiting should be built in the client so local use is supported. Another possible factor for the timeout is the GRPC serialization. As per https://cloud.google.com/blog/products/data-analytics/testing-cloud-pubsub-clients-to-maximize-streaming-performance, Python does not perform well and would require lots of CPU's to improve throughput. For "light clients" i.e. limited CPU and bandwidth, I am wondering if a http GCF that takes in the (gzipped compressed) message and publishes it to pubsub would not perform better. Calling script could even use Thoughts ? |
@yiga2
That's indeed a difference. I tested on a 40/40 Mbps home fiber optic with a decent router, which is probably why all the load went through fine. Publish rate limiting is something that is indeed worth considering IMO. There already exists a similar feature request, albeit for a different reason (to avoid consuming too much memory). In fact, this feature was actually already considered at some point. I can check what the current status is at the next weekly meeting and post when there's more info (cc: @kamalaboulhosn).
AFAIK, the underlying gRPC transport does not compress the payload by default. If the bandwidth on the client machine is the bottleneck, and the nature of the message data allows for significant compression gains, then performance gains might be achievable, although that would require additional CPU resources for compressing the data first... hard to say in a vacuum. @dillonjohnson If you reduce the number of messages to publish, does the error on your machine still occur? If it doesn't, it means that it's indeed an issue with the CPU/network bottleneck. Unfortunately, the publisher client currently does not do any throttling on its own, it only batches multiple successive |
@plamut I've implemented a simple throttling in the application I've built, using the logic in the original question (send x amount, wait for those to send, send more, so on and so forth). Thanks for looking into this. I appreciate your time and help and looking forward to updates in the future. |
@dillonjohnson Thanks for the additional checks, and good that you've found the way around the problem! Since I can imagine other users could face similar issues, too, I'll bring this topic up at the next PubSub meeting and see what the plans are. |
@dillonjohnson There is an upcoming feature (PR #96) that you might be interested in. It's not exactly throttling, but still allows one to specify the desired thresholds for the number of in-flight messages being published or their total size, and the desired action to take if these thresholds are exceeded. The default is IGNORE, but one can also choose BLOCK or ERROR. |
Environment details
PRETTY_NAME="Debian GNU/Linux 10 (buster)"
python --version
python3.7pip --version
20.0.2google-cloud-pubsub
version:pip show google-cloud-pubsub
1.4.2Steps to reproduce
Code example
Stack trace
Other Details:
I've found a similar issue here
googleapis/google-cloud-java#3867
Which references to this issue
googleapis/google-cloud-java#3003
Then
googleapis/java-pubsub#31
This will normally be running on GKE, so I understand me running it locally may have some variation of behavior due to network speed. However, we would prefer to avoid having this issue either way.
Is this something that is planned/intended to be supported or should I implement auto-throttling?
The text was updated successfully, but these errors were encountered: