Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flow control for message publishing #96

Merged
merged 13 commits into from
Jun 2, 2020

Conversation

plamut
Copy link
Contributor

@plamut plamut commented May 7, 2020

Closes #16.
Closes #72.

This PR adds flow control settings to publisher client, similar to what was added to the Java client recently.

The PR does not interfere with existing publish logic (sequencers, ordering keys, batching...). Instead, it acts like a valve placed in front of the publish pipeline, and taking the configured action if it detects excessive message flow.

Things left to do / discuss

  • A system test for this? Although not sure how feasible would it be, as we would have to publish massive amounts of data on fast connection to have a chance of triggering the flow control error...
  • If the desired behavior is BLOCK, should blocking also support timeouts? However, every publish future gets resolved sooner or later be done (even if due to an error/timeout), and will release the corresponding message from the flow controller then.
  • Unlike Java, the FlowControl settings are placed under PublisherOptions, and not BatchSettings. They fit there much more naturally IMO, but I can change this.
  • What should be the default FlowControl values (the thresholds, specifically)? I set the message count and byte size to 10x the defaults in BatchSettings, would that be fine for most users?
  • Documenting the new feature more extensively than docstring updates in this PR? Can in principle also be done in a separate PR.

PR checklist

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

@plamut plamut requested review from pradn and kamalaboulhosn May 7, 2020 13:52
@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label May 7, 2020
@plamut plamut added the type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. label May 8, 2020
@plamut
Copy link
Contributor Author

plamut commented May 11, 2020

I added support for multithreaded add(), including the logic to prevent threads adding large messages from starving.

Things left:

  • Improve debug log messages (include info on the current flow controller capacity)
  • ERROR path: raise overflow errors through the publish future, not directly.
  • Add a smoke system test - probably using the ERROR path, as triggering a timeout would be too difficult on reasonably fast connection (we don't want to DOS the infrastructure, after all).
    Added a unit test covering the same, just without hitting the backend (doesn't matter for the flow controller, as this is past it in the pipeline).

@plamut plamut marked this pull request as ready for review May 12, 2020 09:06
@plamut plamut requested review from pradn and kamalaboulhosn May 12, 2020 09:06
@plamut
Copy link
Contributor Author

plamut commented May 28, 2020

@pradn Do you have anything to add here, or are we good to merge?

error_msg = "Flow control limits too low for the message - {}.".format(
load_info
)
raise exceptions.PermanentlyBlockedError(error_msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"WouldPermanentlyBlockError" makes more sense, because it's not already permanently blocked.

Even better, I think we can simply omit the details about blocking and say something more generic: FlowControlMessageTooLargeError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I will rename the exception.

Copy link
Contributor Author

@plamut plamut May 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I eventually opted for the generic FlowControlLimitError for a few reasons:

  • FlowControlMessageTooLargeError could be confusing, as there already exists a similarly named MessageTooLargeError for the messages that would exceed the limits on the backend.
  • A message could block permanently for reasons other than its size, e.g. if flow control's max messages is set to 0 or some other non-sensible value. Mentioning the size would actually be misleading in such case, but a generic FlowControlLimitsError would semantically still be correct.
  • One less exception type to keep track of.

google/cloud/pubsub_v1/publisher/flow_controller.py Outdated Show resolved Hide resolved
@plamut
Copy link
Contributor Author

plamut commented Jun 2, 2020

@pradn If there's nothing else left, please just give an approval to unblock the merge, thanks! :)

@plamut plamut merged commit 06085c4 into googleapis:master Jun 2, 2020
@plamut plamut deleted the iss-72 branch June 2, 2020 19:09
gcf-merge-on-green bot pushed a commit that referenced this pull request Jun 9, 2020
## [1.6.0](https://www.github.com/googleapis/python-pubsub/compare/v1.5.0...v1.6.0) (2020-06-09)

### Features

* Add flow control for message publishing ([#96](https://www.github.com/googleapis/python-pubsub/issues/96)) ([06085c4](https://www.github.com/googleapis/python-pubsub/commit/06085c4083b9dccdd50383257799904510bbf3a0))


### Bug Fixes

* Fix PubSub incompatibility with api-core 1.17.0+ ([#103](https://www.github.com/googleapis/python-pubsub/issues/103)) ([c02060f](https://www.github.com/googleapis/python-pubsub/commit/c02060fbbe6e2ca4664bee08d2de10665d41dc0b))


### Documentation
- Clarify that Schedulers shouldn't be used with multiple SubscriberClients ([#100](#100)) ([cf9e87c](cf9e87c))
- Fix update subscription/snapshot/topic samples ([#113](#113)) ([e62c38b](e62c38b))


### Internal / Testing Changes
- Re-generated service implementaton using synth: removed experimental notes from the RetryPolicy and filtering features in anticipation of GA, added DetachSubscription (experimental) ([#114](#114)) ([0132a46](0132a46))
- Incorporate will_accept() checks into publish() ([#108](#108)) ([6c7677e](6c7677e))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes This human has signed the Contributor License Agreement. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
4 participants