diff --git a/api/envoy/extensions/filters/network/thrift_proxy/v3/thrift_proxy.proto b/api/envoy/extensions/filters/network/thrift_proxy/v3/thrift_proxy.proto index 659ee1460440..512f810e4139 100644 --- a/api/envoy/extensions/filters/network/thrift_proxy/v3/thrift_proxy.proto +++ b/api/envoy/extensions/filters/network/thrift_proxy/v3/thrift_proxy.proto @@ -5,6 +5,7 @@ package envoy.extensions.filters.network.thrift_proxy.v3; import "envoy/extensions/filters/network/thrift_proxy/v3/route.proto"; import "google/protobuf/any.proto"; +import "google/protobuf/wrappers.proto"; import "udpa/annotations/status.proto"; import "udpa/annotations/versioning.proto"; @@ -57,7 +58,7 @@ enum ProtocolType { TWITTER = 4; } -// [#next-free-field: 7] +// [#next-free-field: 8] message ThriftProxy { option (udpa.annotations.versioning).previous_message_type = "envoy.config.filter.network.thrift_proxy.v2alpha1.ThriftProxy"; @@ -88,6 +89,9 @@ message ThriftProxy { // is the same, the transport type is framed and the protocol is not Twitter. Otherwise Envoy will // fallback to decode the data. bool payload_passthrough = 6; + + // Optional maximum requests for a single downstream connection. If not specified, there is no limit. + google.protobuf.UInt32Value max_requests_per_connection = 7; } // ThriftFilter configures a Thrift filter. diff --git a/api/envoy/extensions/filters/network/thrift_proxy/v4alpha/thrift_proxy.proto b/api/envoy/extensions/filters/network/thrift_proxy/v4alpha/thrift_proxy.proto index 183420227236..4ae4e26e4dbf 100644 --- a/api/envoy/extensions/filters/network/thrift_proxy/v4alpha/thrift_proxy.proto +++ b/api/envoy/extensions/filters/network/thrift_proxy/v4alpha/thrift_proxy.proto @@ -5,6 +5,7 @@ package envoy.extensions.filters.network.thrift_proxy.v4alpha; import "envoy/extensions/filters/network/thrift_proxy/v4alpha/route.proto"; import "google/protobuf/any.proto"; +import "google/protobuf/wrappers.proto"; import "udpa/annotations/status.proto"; import "udpa/annotations/versioning.proto"; @@ -57,7 +58,7 @@ enum ProtocolType { TWITTER = 4; } -// [#next-free-field: 7] +// [#next-free-field: 8] message ThriftProxy { option (udpa.annotations.versioning).previous_message_type = "envoy.extensions.filters.network.thrift_proxy.v3.ThriftProxy"; @@ -88,6 +89,9 @@ message ThriftProxy { // is the same, the transport type is framed and the protocol is not Twitter. Otherwise Envoy will // fallback to decode the data. bool payload_passthrough = 6; + + // Optional maximum requests for a single downstream connection. If not specified, there is no limit. + google.protobuf.UInt32Value max_requests_per_connection = 7; } // ThriftFilter configures a Thrift filter. diff --git a/docs/root/configuration/listeners/network_filters/thrift_proxy_filter.rst b/docs/root/configuration/listeners/network_filters/thrift_proxy_filter.rst index 504f6f873752..94972ebe3112 100644 --- a/docs/root/configuration/listeners/network_filters/thrift_proxy_filter.rst +++ b/docs/root/configuration/listeners/network_filters/thrift_proxy_filter.rst @@ -15,6 +15,14 @@ keyed by `envoy.filters.network.thrift_proxy`. The :ref:`ThriftProtocolOptions` message describes the available options. +Downstream Requests Limit +------------------------- +Thrift Proxy can set the +:ref:`maximum number of requests` +that each downstream connection can handle. When the number of requests exceeds the connection limit, Thrift Proxy will +actively disconnect from the Thrift client. + + Thrift Request Metadata ----------------------- diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index c3c7657298ac..6aa9e13dfd5b 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -116,6 +116,7 @@ New Features * tcp_proxy: add support for converting raw TCP streams into HTTP/1.1 CONNECT requests. See :ref:`upgrade documentation ` for details. * tcp_proxy: added a :ref:`use_post field ` for using HTTP POST to proxy TCP streams. * tcp_proxy: added a :ref:`headers_to_add field ` for setting additional headers to the HTTP requests for TCP proxing. +* thrift_proxy: added a :ref:`max_requests_per_connection field ` for setting maximum requests for per downstream connection. Deprecated ---------- diff --git a/generated_api_shadow/envoy/extensions/filters/network/thrift_proxy/v3/thrift_proxy.proto b/generated_api_shadow/envoy/extensions/filters/network/thrift_proxy/v3/thrift_proxy.proto index 8485693caaf9..3083973f3da2 100644 --- a/generated_api_shadow/envoy/extensions/filters/network/thrift_proxy/v3/thrift_proxy.proto +++ b/generated_api_shadow/envoy/extensions/filters/network/thrift_proxy/v3/thrift_proxy.proto @@ -6,6 +6,7 @@ import "envoy/extensions/filters/network/thrift_proxy/v3/route.proto"; import "google/protobuf/any.proto"; import "google/protobuf/struct.proto"; +import "google/protobuf/wrappers.proto"; import "udpa/annotations/status.proto"; import "udpa/annotations/versioning.proto"; @@ -58,7 +59,7 @@ enum ProtocolType { TWITTER = 4; } -// [#next-free-field: 7] +// [#next-free-field: 8] message ThriftProxy { option (udpa.annotations.versioning).previous_message_type = "envoy.config.filter.network.thrift_proxy.v2alpha1.ThriftProxy"; @@ -89,6 +90,9 @@ message ThriftProxy { // is the same, the transport type is framed and the protocol is not Twitter. Otherwise Envoy will // fallback to decode the data. bool payload_passthrough = 6; + + // Optional maximum requests for a single downstream connection. If not specified, there is no limit. + google.protobuf.UInt32Value max_requests_per_connection = 7; } // ThriftFilter configures a Thrift filter. diff --git a/generated_api_shadow/envoy/extensions/filters/network/thrift_proxy/v4alpha/thrift_proxy.proto b/generated_api_shadow/envoy/extensions/filters/network/thrift_proxy/v4alpha/thrift_proxy.proto index 183420227236..4ae4e26e4dbf 100644 --- a/generated_api_shadow/envoy/extensions/filters/network/thrift_proxy/v4alpha/thrift_proxy.proto +++ b/generated_api_shadow/envoy/extensions/filters/network/thrift_proxy/v4alpha/thrift_proxy.proto @@ -5,6 +5,7 @@ package envoy.extensions.filters.network.thrift_proxy.v4alpha; import "envoy/extensions/filters/network/thrift_proxy/v4alpha/route.proto"; import "google/protobuf/any.proto"; +import "google/protobuf/wrappers.proto"; import "udpa/annotations/status.proto"; import "udpa/annotations/versioning.proto"; @@ -57,7 +58,7 @@ enum ProtocolType { TWITTER = 4; } -// [#next-free-field: 7] +// [#next-free-field: 8] message ThriftProxy { option (udpa.annotations.versioning).previous_message_type = "envoy.extensions.filters.network.thrift_proxy.v3.ThriftProxy"; @@ -88,6 +89,9 @@ message ThriftProxy { // is the same, the transport type is framed and the protocol is not Twitter. Otherwise Envoy will // fallback to decode the data. bool payload_passthrough = 6; + + // Optional maximum requests for a single downstream connection. If not specified, there is no limit. + google.protobuf.UInt32Value max_requests_per_connection = 7; } // ThriftFilter configures a Thrift filter. diff --git a/source/extensions/filters/network/thrift_proxy/config.cc b/source/extensions/filters/network/thrift_proxy/config.cc index 8c7a967597c4..dd835c7cf48d 100644 --- a/source/extensions/filters/network/thrift_proxy/config.cc +++ b/source/extensions/filters/network/thrift_proxy/config.cc @@ -122,7 +122,8 @@ ConfigImpl::ConfigImpl( stats_(ThriftFilterStats::generateStats(stats_prefix_, context_.scope())), transport_(lookupTransport(config.transport())), proto_(lookupProtocol(config.protocol())), route_matcher_(new Router::RouteMatcher(config.route_config())), - payload_passthrough_(config.payload_passthrough()) { + payload_passthrough_(config.payload_passthrough()), + max_requests_per_connection_(config.max_requests_per_connection().value()) { if (config.thrift_filters().empty()) { ENVOY_LOG(debug, "using default router filter"); diff --git a/source/extensions/filters/network/thrift_proxy/config.h b/source/extensions/filters/network/thrift_proxy/config.h index 02c1fcf4d13a..de2cd2bb4e29 100644 --- a/source/extensions/filters/network/thrift_proxy/config.h +++ b/source/extensions/filters/network/thrift_proxy/config.h @@ -82,6 +82,7 @@ class ConfigImpl : public Config, ProtocolPtr createProtocol() override; Router::Config& routerConfig() override { return *this; } bool payloadPassthrough() const override { return payload_passthrough_; } + uint64_t maxRequestsPerConnection() const override { return max_requests_per_connection_; } private: void processFilter( @@ -96,6 +97,8 @@ class ConfigImpl : public Config, std::list filter_factories_; const bool payload_passthrough_; + + const uint64_t max_requests_per_connection_{}; }; } // namespace ThriftProxy diff --git a/source/extensions/filters/network/thrift_proxy/conn_manager.cc b/source/extensions/filters/network/thrift_proxy/conn_manager.cc index d3e23b5d29cc..122eaabbdedf 100644 --- a/source/extensions/filters/network/thrift_proxy/conn_manager.cc +++ b/source/extensions/filters/network/thrift_proxy/conn_manager.cc @@ -55,6 +55,11 @@ void ConnectionManager::dispatch() { return; } + if (requests_overflow_) { + ENVOY_CONN_LOG(debug, "thrift filter requests overflow", read_callbacks_->connection()); + return; + } + try { bool underflow = false; while (!underflow) { @@ -139,6 +144,9 @@ void ConnectionManager::continueDecoding() { void ConnectionManager::doDeferredRpcDestroy(ConnectionManager::ActiveRpc& rpc) { read_callbacks_->connection().dispatcher().deferredDelete(rpc.removeFromList(rpcs_)); + if (requests_overflow_ && rpcs_.empty()) { + read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite); + } } void ConnectionManager::resetAllRpcs(bool local_reset) { @@ -404,6 +412,14 @@ void ConnectionManager::ActiveRpc::finalizeRequest() { parent_.stats_.request_.inc(); + parent_.accumulated_requests_++; + if (parent_.config_.maxRequestsPerConnection() > 0 && + parent_.accumulated_requests_ >= parent_.config_.maxRequestsPerConnection()) { + parent_.read_callbacks_->connection().readDisable(true); + parent_.requests_overflow_ = true; + parent_.stats_.downstream_cx_max_requests_.inc(); + } + bool destroy_rpc = false; switch (original_msg_type_) { case MessageType::Call: diff --git a/source/extensions/filters/network/thrift_proxy/conn_manager.h b/source/extensions/filters/network/thrift_proxy/conn_manager.h index 12bc08347150..af41b6c4d6c7 100644 --- a/source/extensions/filters/network/thrift_proxy/conn_manager.h +++ b/source/extensions/filters/network/thrift_proxy/conn_manager.h @@ -40,6 +40,7 @@ class Config { virtual ProtocolPtr createProtocol() PURE; virtual Router::Config& routerConfig() PURE; virtual bool payloadPassthrough() const PURE; + virtual uint64_t maxRequestsPerConnection() const PURE; }; /** @@ -272,6 +273,10 @@ class ConnectionManager : public Network::ReadFilter, bool stopped_{false}; bool half_closed_{false}; TimeSource& time_source_; + + // The number of requests accumulated on the current connection. + uint64_t accumulated_requests_{}; + bool requests_overflow_{false}; }; } // namespace ThriftProxy diff --git a/source/extensions/filters/network/thrift_proxy/stats.h b/source/extensions/filters/network/thrift_proxy/stats.h index 9166f37be6ca..7e57db76f803 100644 --- a/source/extensions/filters/network/thrift_proxy/stats.h +++ b/source/extensions/filters/network/thrift_proxy/stats.h @@ -16,6 +16,7 @@ namespace ThriftProxy { #define ALL_THRIFT_FILTER_STATS(COUNTER, GAUGE, HISTOGRAM) \ COUNTER(cx_destroy_local_with_active_rq) \ COUNTER(cx_destroy_remote_with_active_rq) \ + COUNTER(downstream_cx_max_requests) \ COUNTER(request) \ COUNTER(request_call) \ COUNTER(request_decoding_error) \ diff --git a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc index 3764bbd44573..d02028fd90e0 100644 --- a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc +++ b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc @@ -120,6 +120,33 @@ class ThriftConnectionManagerTest : public testing::Test { filter_->onBelowWriteBufferLowWatermark(); } + // Return the number of requests actually sent. + uint32_t sendRequests(uint32_t request_number) { + for (uint32_t i = 0; i < request_number; i++) { + writeComplexFramedBinaryMessage(buffer_, MessageType::Call, 0x0F); + writeComplexFramedBinaryMessage(write_buffer_, MessageType::Reply, 0x0F); + + ThriftFilters::DecoderFilterCallbacks* callbacks{}; + ON_CALL(*decoder_filter_, setDecoderFilterCallbacks(_)) + .WillByDefault( + Invoke([&](ThriftFilters::DecoderFilterCallbacks& cb) -> void { callbacks = &cb; })); + + EXPECT_EQ(filter_->onData(buffer_, false), Network::FilterStatus::StopIteration); + + if (!callbacks) { + return i; + } + + FramedTransportImpl transport; + BinaryProtocolImpl proto; + callbacks->startUpstreamResponse(transport, proto); + + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)); + EXPECT_EQ(ThriftFilters::ResponseStatus::Complete, callbacks->upstreamData(write_buffer_)); + } + return request_number; + } + void writeMessage(Buffer::Instance& buffer, TransportType transport_type, ProtocolType protocol_type, MessageType msg_type, int32_t seq_id) { Buffer::OwnedImpl msg; @@ -1058,6 +1085,178 @@ TEST_F(ThriftConnectionManagerTest, ResetDownstreamConnection) { EXPECT_EQ(0U, stats_.request_active_.value()); } +// Test the base case where there is no limit on the number of requests. +TEST_F(ThriftConnectionManagerTest, RequestWithNoMaxRequestsLimit) { + initializeFilter(""); + EXPECT_EQ(0, config_->maxRequestsPerConnection()); + + EXPECT_EQ(50, sendRequests(50)); + + filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); + + EXPECT_EQ(0U, store_.counter("test.downstream_cx_max_requests").value()); + EXPECT_EQ(50U, store_.counter("test.request").value()); + EXPECT_EQ(50U, store_.counter("test.request_call").value()); + EXPECT_EQ(0U, stats_.request_active_.value()); + EXPECT_EQ(50U, store_.counter("test.response").value()); + EXPECT_EQ(50U, store_.counter("test.response_reply").value()); + EXPECT_EQ(0U, store_.counter("test.response_exception").value()); + EXPECT_EQ(0U, store_.counter("test.response_invalid_type").value()); + EXPECT_EQ(50U, store_.counter("test.response_success").value()); + EXPECT_EQ(0U, store_.counter("test.response_error").value()); +} + +// Test the case where there is a limit on the number of requests but the actual number of requests +// does not reach the limit. +TEST_F(ThriftConnectionManagerTest, RequestWithMaxRequestsLimitButNotReach) { + const std::string yaml = R"EOF( + stat_prefix: test + route_config: + name: local_route + max_requests_per_connection: 50 + )EOF"; + + initializeFilter(yaml); + EXPECT_EQ(50, config_->maxRequestsPerConnection()); + + EXPECT_EQ(49, sendRequests(49)); + + filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); + + EXPECT_EQ(0U, store_.counter("test.downstream_cx_max_requests").value()); + EXPECT_EQ(49U, store_.counter("test.request").value()); + EXPECT_EQ(49U, store_.counter("test.request_call").value()); + EXPECT_EQ(0U, stats_.request_active_.value()); + EXPECT_EQ(49U, store_.counter("test.response").value()); + EXPECT_EQ(49U, store_.counter("test.response_reply").value()); + EXPECT_EQ(0U, store_.counter("test.response_exception").value()); + EXPECT_EQ(0U, store_.counter("test.response_invalid_type").value()); + EXPECT_EQ(49U, store_.counter("test.response_success").value()); + EXPECT_EQ(0U, store_.counter("test.response_error").value()); +} + +// Test the case where there is a limit on the number of requests and the actual number of requests +// happens to reach the limit. +TEST_F(ThriftConnectionManagerTest, RequestWithMaxRequestsLimitAndReached) { + const std::string yaml = R"EOF( + stat_prefix: test + route_config: + name: local_route + max_requests_per_connection: 50 + )EOF"; + + initializeFilter(yaml); + EXPECT_EQ(50, config_->maxRequestsPerConnection()); + + // Since max requests per connection is set to 50, the connection will be disconnected after + // all 50 requests is completed. + EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)); + + EXPECT_EQ(50, sendRequests(50)); + + filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); + + EXPECT_EQ(1U, store_.counter("test.downstream_cx_max_requests").value()); + EXPECT_EQ(50U, store_.counter("test.request").value()); + EXPECT_EQ(50U, store_.counter("test.request_call").value()); + EXPECT_EQ(0U, stats_.request_active_.value()); + EXPECT_EQ(50U, store_.counter("test.response").value()); + EXPECT_EQ(50U, store_.counter("test.response_reply").value()); + EXPECT_EQ(0U, store_.counter("test.response_exception").value()); + EXPECT_EQ(0U, store_.counter("test.response_invalid_type").value()); + EXPECT_EQ(50U, store_.counter("test.response_success").value()); + EXPECT_EQ(0U, store_.counter("test.response_error").value()); +} + +// Test the case where there is a limit on the number of requests and the actual number of requests +// exceeds the limit. +TEST_F(ThriftConnectionManagerTest, RequestWithMaxRequestsLimitAndReachedWithMoreRequests) { + const std::string yaml = R"EOF( + stat_prefix: test + route_config: + name: local_route + max_requests_per_connection: 50 + )EOF"; + + initializeFilter(yaml); + EXPECT_EQ(50, config_->maxRequestsPerConnection()); + + // Since max requests per connection is set to 50, the connection will be disconnected after + // all 50 requests is completed. + EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)); + + EXPECT_EQ(50, sendRequests(55)); + + filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); + + EXPECT_EQ(1U, store_.counter("test.downstream_cx_max_requests").value()); + EXPECT_EQ(50U, store_.counter("test.request").value()); + EXPECT_EQ(50U, store_.counter("test.request_call").value()); + EXPECT_EQ(0U, stats_.request_active_.value()); + EXPECT_EQ(50U, store_.counter("test.response").value()); + EXPECT_EQ(50U, store_.counter("test.response_reply").value()); + EXPECT_EQ(0U, store_.counter("test.response_exception").value()); + EXPECT_EQ(0U, store_.counter("test.response_invalid_type").value()); + EXPECT_EQ(50U, store_.counter("test.response_success").value()); + EXPECT_EQ(0U, store_.counter("test.response_error").value()); +} + +// Test cases where the number of requests is limited and the actual number of requests exceeds the +// limit several times. +TEST_F(ThriftConnectionManagerTest, RequestWithMaxRequestsLimitAndReachedRepeatedly) { + const std::string yaml = R"EOF( + stat_prefix: test + route_config: + name: local_route + max_requests_per_connection: 5 + )EOF"; + + initializeFilter(yaml); + EXPECT_EQ(5, config_->maxRequestsPerConnection()); + + EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)) + .Times(5); + + auto mock_new_connection = [this]() { + filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); + + filter_ = nullptr; + + filter_callbacks_.connection_.read_enabled_ = true; + filter_callbacks_.connection_.state_ = Network::Connection::State::Open; + filter_callbacks_.connection_.callbacks_.clear(); + + ON_CALL(random_, random()).WillByDefault(Return(42)); + filter_ = std::make_unique( + *config_, random_, filter_callbacks_.connection_.dispatcher_.timeSource()); + filter_->initializeReadFilterCallbacks(filter_callbacks_); + filter_->onNewConnection(); + + filter_->onAboveWriteBufferHighWatermark(); + filter_->onBelowWriteBufferLowWatermark(); + }; + + EXPECT_EQ(5, sendRequests(6)); + + for (size_t i = 0; i < 4; i++) { + mock_new_connection(); + EXPECT_EQ(5, sendRequests(6)); + } + + filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList(); + + EXPECT_EQ(5U, store_.counter("test.downstream_cx_max_requests").value()); + EXPECT_EQ(25U, store_.counter("test.request").value()); + EXPECT_EQ(25U, store_.counter("test.request_call").value()); + EXPECT_EQ(0U, stats_.request_active_.value()); + EXPECT_EQ(25U, store_.counter("test.response").value()); + EXPECT_EQ(25U, store_.counter("test.response_reply").value()); + EXPECT_EQ(0U, store_.counter("test.response_exception").value()); + EXPECT_EQ(0U, store_.counter("test.response_invalid_type").value()); + EXPECT_EQ(25U, store_.counter("test.response_success").value()); + EXPECT_EQ(0U, store_.counter("test.response_error").value()); +} + TEST_F(ThriftConnectionManagerTest, DownstreamProtocolUpgrade) { custom_transport_ = new NiceMock(); custom_protocol_ = new NiceMock(); @@ -1179,7 +1378,8 @@ TEST_F(ThriftConnectionManagerTest, OnDataResumesWithNextFilter) { EXPECT_EQ(1U, stats_.request_active_.value()); } -// Tests stop iteration/resume with multiple filters when iteration is stopped during transportEnd. +// Tests stop iteration/resume with multiple filters when iteration is stopped during +// transportEnd. TEST_F(ThriftConnectionManagerTest, OnDataResumesWithNextFilterOnTransportEnd) { auto* filter = new NiceMock(); custom_filter_.reset(filter);