Skip to content

Commit

Permalink
thrift: supported setting max requests for per downstream connection (#…
Browse files Browse the repository at this point in the history
…15125)

Support setting max requests per downstream connection. By setting max_requests_per_connection, Envoy can actively disconnect the thrift client after reaching a certain number of requests.
Due to the limitations of the thrift protocol itself, we cannot achieve a clean disconnection.

Risk Level: Normal
Testing: Added
Docs Changes: Added
Release Notes: Added
Fixes: #14560

Signed-off-by: wbpcode <[email protected]>
  • Loading branch information
wbpcode authored Mar 2, 2021
1 parent 15e3b9d commit 8a2685e
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package envoy.extensions.filters.network.thrift_proxy.v3;
import "envoy/extensions/filters/network/thrift_proxy/v3/route.proto";

import "google/protobuf/any.proto";
import "google/protobuf/wrappers.proto";

import "udpa/annotations/status.proto";
import "udpa/annotations/versioning.proto";
Expand Down Expand Up @@ -57,7 +58,7 @@ enum ProtocolType {
TWITTER = 4;
}

// [#next-free-field: 7]
// [#next-free-field: 8]
message ThriftProxy {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.network.thrift_proxy.v2alpha1.ThriftProxy";
Expand Down Expand Up @@ -88,6 +89,9 @@ message ThriftProxy {
// is the same, the transport type is framed and the protocol is not Twitter. Otherwise Envoy will
// fallback to decode the data.
bool payload_passthrough = 6;

// Optional maximum requests for a single downstream connection. If not specified, there is no limit.
google.protobuf.UInt32Value max_requests_per_connection = 7;
}

// ThriftFilter configures a Thrift filter.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ keyed by `envoy.filters.network.thrift_proxy`. The
:ref:`ThriftProtocolOptions<envoy_v3_api_msg_extensions.filters.network.thrift_proxy.v3.ThriftProtocolOptions>`
message describes the available options.

Downstream Requests Limit
-------------------------
Thrift Proxy can set the
:ref:`maximum number of requests<envoy_v3_api_field_extensions.filters.network.thrift_proxy.v3.ThriftProxy.max_requests_per_connection>`
that each downstream connection can handle. When the number of requests exceeds the connection limit, Thrift Proxy will
actively disconnect from the Thrift client.


Thrift Request Metadata
-----------------------

Expand Down
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ New Features
* tcp_proxy: add support for converting raw TCP streams into HTTP/1.1 CONNECT requests. See :ref:`upgrade documentation <tunneling-tcp-over-http>` for details.
* tcp_proxy: added a :ref:`use_post field <envoy_v3_api_field_extensions.filters.network.tcp_proxy.v3.TcpProxy.TunnelingConfig.use_post>` for using HTTP POST to proxy TCP streams.
* tcp_proxy: added a :ref:`headers_to_add field <envoy_v3_api_field_extensions.filters.network.tcp_proxy.v3.TcpProxy.TunnelingConfig.headers_to_add>` for setting additional headers to the HTTP requests for TCP proxing.
* thrift_proxy: added a :ref:`max_requests_per_connection field <envoy_v3_api_field_extensions.filters.network.thrift_proxy.v3.ThriftProxy.max_requests_per_connection>` for setting maximum requests for per downstream connection.

Deprecated
----------

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion source/extensions/filters/network/thrift_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ ConfigImpl::ConfigImpl(
stats_(ThriftFilterStats::generateStats(stats_prefix_, context_.scope())),
transport_(lookupTransport(config.transport())), proto_(lookupProtocol(config.protocol())),
route_matcher_(new Router::RouteMatcher(config.route_config())),
payload_passthrough_(config.payload_passthrough()) {
payload_passthrough_(config.payload_passthrough()),
max_requests_per_connection_(config.max_requests_per_connection().value()) {

if (config.thrift_filters().empty()) {
ENVOY_LOG(debug, "using default router filter");
Expand Down
3 changes: 3 additions & 0 deletions source/extensions/filters/network/thrift_proxy/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class ConfigImpl : public Config,
ProtocolPtr createProtocol() override;
Router::Config& routerConfig() override { return *this; }
bool payloadPassthrough() const override { return payload_passthrough_; }
uint64_t maxRequestsPerConnection() const override { return max_requests_per_connection_; }

private:
void processFilter(
Expand All @@ -96,6 +97,8 @@ class ConfigImpl : public Config,

std::list<ThriftFilters::FilterFactoryCb> filter_factories_;
const bool payload_passthrough_;

const uint64_t max_requests_per_connection_{};
};

} // namespace ThriftProxy
Expand Down
16 changes: 16 additions & 0 deletions source/extensions/filters/network/thrift_proxy/conn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ void ConnectionManager::dispatch() {
return;
}

if (requests_overflow_) {
ENVOY_CONN_LOG(debug, "thrift filter requests overflow", read_callbacks_->connection());
return;
}

try {
bool underflow = false;
while (!underflow) {
Expand Down Expand Up @@ -139,6 +144,9 @@ void ConnectionManager::continueDecoding() {

void ConnectionManager::doDeferredRpcDestroy(ConnectionManager::ActiveRpc& rpc) {
read_callbacks_->connection().dispatcher().deferredDelete(rpc.removeFromList(rpcs_));
if (requests_overflow_ && rpcs_.empty()) {
read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
}
}

void ConnectionManager::resetAllRpcs(bool local_reset) {
Expand Down Expand Up @@ -404,6 +412,14 @@ void ConnectionManager::ActiveRpc::finalizeRequest() {

parent_.stats_.request_.inc();

parent_.accumulated_requests_++;
if (parent_.config_.maxRequestsPerConnection() > 0 &&
parent_.accumulated_requests_ >= parent_.config_.maxRequestsPerConnection()) {
parent_.read_callbacks_->connection().readDisable(true);
parent_.requests_overflow_ = true;
parent_.stats_.downstream_cx_max_requests_.inc();
}

bool destroy_rpc = false;
switch (original_msg_type_) {
case MessageType::Call:
Expand Down
5 changes: 5 additions & 0 deletions source/extensions/filters/network/thrift_proxy/conn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Config {
virtual ProtocolPtr createProtocol() PURE;
virtual Router::Config& routerConfig() PURE;
virtual bool payloadPassthrough() const PURE;
virtual uint64_t maxRequestsPerConnection() const PURE;
};

/**
Expand Down Expand Up @@ -272,6 +273,10 @@ class ConnectionManager : public Network::ReadFilter,
bool stopped_{false};
bool half_closed_{false};
TimeSource& time_source_;

// The number of requests accumulated on the current connection.
uint64_t accumulated_requests_{};
bool requests_overflow_{false};
};

} // namespace ThriftProxy
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/network/thrift_proxy/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace ThriftProxy {
#define ALL_THRIFT_FILTER_STATS(COUNTER, GAUGE, HISTOGRAM) \
COUNTER(cx_destroy_local_with_active_rq) \
COUNTER(cx_destroy_remote_with_active_rq) \
COUNTER(downstream_cx_max_requests) \
COUNTER(request) \
COUNTER(request_call) \
COUNTER(request_decoding_error) \
Expand Down
Loading

0 comments on commit 8a2685e

Please sign in to comment.