Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PRS from being blocked because of misbehaving clients #15339

Merged
merged 13 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/20.0/20.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [Delete with Subquery Support](#delete-subquery)
- **[Flag changes](#flag-changes)**
- [`pprof-http` default change](#pprof-http-default)
- [`shutdown_grace_period` Default Change](#shutdown-grace-period-default)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
- **[Minor Changes](#minor-changes)**
- **[New Stats](#new-stats)**
- [VTTablet Query Cache Hits and Misses](#vttablet-query-cache-hits-and-misses)
Expand Down Expand Up @@ -61,6 +62,10 @@ The `--pprof-http` flag, which was introduced in v19 with a default of `true`, h
This makes HTTP `pprof` endpoints now an *opt-in* feature, rather than opt-out.
To continue enabling these endpoints, explicitly set `--pprof-http` when starting up Vitess components.

#### <a id="shutdown-grace-period-default"/> `shutdown_grace_period` Default Change

The `--shutdown_grace_period` flag, which was introduced in v2 with a default of `0 seconds`, has now been changed to default to `3 seconds`.
This makes reparenting in Vitess resilient to client errors, and prevents PlannedReparentShard from timing out.
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved

## <a id="minor-changes"/>Minor Changes

Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ Flags:
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
--shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown.
--shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown. (default 3s)
--sql-max-length-errors int truncate queries in error logs to the given length (default unlimited)
--sql-max-length-ui int truncate queries in debug UIs to the given length (default 512) (default 512)
--srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ Flags:
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
--shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown.
--shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown. (default 3s)
--sql-max-length-errors int truncate queries in error logs to the given length (default unlimited)
--sql-max-length-ui int truncate queries in debug UIs to the given length (default 512) (default 512)
--srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s)
Expand Down
15 changes: 9 additions & 6 deletions go/test/endtoend/tabletgateway/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package healthcheck

import (
"flag"
"fmt"
"os"
"testing"

Expand All @@ -26,11 +27,12 @@ import (
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
keyspaceName = "commerce"
cell = "zone1"
sqlSchema = `create table product(
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
keyspaceName = "commerce"
vtgateGrpcAddress string
cell = "zone1"
sqlSchema = `create table product(
sku varbinary(128),
description varbinary(128),
price bigint,
Expand Down Expand Up @@ -64,7 +66,7 @@ func TestMain(m *testing.M) {

exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, "localhost")
clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s"}
clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s", "--shutdown_grace_period", "3s"}
defer clusterInstance.Teardown()

// Start topo server
Expand Down Expand Up @@ -96,6 +98,7 @@ func TestMain(m *testing.M) {
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort)
return m.Run()
}()
os.Exit(exitCode)
Expand Down
39 changes: 39 additions & 0 deletions go/test/endtoend/tabletgateway/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
vtorcutils "vitess.io/vitess/go/test/endtoend/vtorc/utils"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -283,6 +284,44 @@ func TestReplicaTransactions(t *testing.T) {
assert.Equal(t, `[[INT64(1) VARCHAR("email1")] [INT64(2) VARCHAR("email2")]]`, fmt.Sprintf("%v", qr4.Rows), "we are not able to reconnect after restart")
}

// TestStreamingRPCStuck tests that StreamExecute calls don't get stuck on the vttablets if a client stop reading from a stream.
func TestStreamingRPCStuck(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
vtConn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer vtConn.Close()

// We want the table to have enough rows such that a streaming call returns multiple packets.
// Therefore, we insert one row and keep doubling it.
utils.Exec(t, vtConn, "insert into customer(email) values('testemail')")
for i := 0; i < 15; i++ {
// Double the number of rows in customer table.
utils.Exec(t, vtConn, "insert into customer (email) select email from customer")
}

// Connect to vtgate and run a streaming query.
vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "test_user", "")
require.NoError(t, err)
stream, err := vtgateConn.Session("", &querypb.ExecuteOptions{}).StreamExecute(ctx, "select * from customer", map[string]*querypb.BindVariable{})
require.NoError(t, err)

// We read packets until we see the first set of results. This ensures that the stream is working.
for {
res, err := stream.Recv()
require.NoError(t, err)
if res != nil && len(res.Rows) > 0 {
// breaking here stops reading from the stream.
break
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
}
}

// We simulate a misbehaving client that doesn't read from the stream anymore.
// This however shouldn't block PlannedReparentShard calls.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, "0", clusterInstance.Keyspaces[0].Shards[0].Vttablets[1].Alias)
require.NoError(t, err)
}

func getMapFromJSON(JSON map[string]any, key string) map[string]any {
result := make(map[string]any)
object := reflect.ValueOf(JSON[key])
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/vtgate/transaction/restart/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func TestMain(m *testing.M) {
Name: keyspaceName,
SchemaSQL: schemaSQL,
}
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--shutdown_grace_period=0s",
)
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false)
if err != nil {
return 1
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction

// Add query detail object into QueryExecutor TableServer list w.r.t if it is a transactional or not. Previously we were adding it
// to olapql list regardless but that resulted in problems, where long-running stream queries which can be stateful (or transactional)
// weren't getting cleaned up during unserveCommon>handleShutdownGracePeriod in state_manager.go.
// weren't getting cleaned up during unserveCommon>terminateAllQueries in state_manager.go.
// This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange
// once their grace period is over.
qd := NewQueryDetail(qre.logStats.Ctx, conn.Conn)
Expand Down
83 changes: 83 additions & 0 deletions go/vt/vttablet/tabletserver/requests_waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tabletserver

import "sync"

// requestsWaiter is used to wait for requests. It stores the count of the requests pending,
// and also the number of waiters currently waiting. It has a mutex as well to protects its fields.
type requestsWaiter struct {
mu sync.Mutex
wg sync.WaitGroup
// waitCounter is the number of goroutines that are waiting for wg to be empty.
// If this value is greater than zero, then we have to ensure that we don't Add to the requests
// to avoid any panics in the wait.
waitCounter int
// counter is the count of the number of outstanding requests.
counter int
}

// newRequestsWaiter creates a new requestsWaiter.
func newRequestsWaiter() *requestsWaiter {
return &requestsWaiter{
mu: sync.Mutex{},
wg: sync.WaitGroup{},
waitCounter: 0,
counter: 0,
}
}
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved

// Add adds to the requestsWaiter.
func (r *requestsWaiter) Add(val int) {
r.mu.Lock()
defer r.mu.Unlock()
r.counter += val
r.wg.Add(val)
}

// Done subtracts 1 from the requestsWaiter.
func (r *requestsWaiter) Done() {
r.Add(-1)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
}

// addToWaitCounter adds to the waitCounter while being protected by a mutex.
func (r *requestsWaiter) addToWaitCounter(val int) {
r.mu.Lock()
defer r.mu.Unlock()
r.waitCounter += val
}

// WaitToBeEmpty waits for requests to be empty. It also increments and decrements the waitCounter as required.
func (r *requestsWaiter) WaitToBeEmpty() {
r.addToWaitCounter(1)
r.wg.Wait()
r.addToWaitCounter(-1)
}

// GetWaiterCount gets the number of go routines currently waiting on the wait group.
func (r *requestsWaiter) GetWaiterCount() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.waitCounter
}

// GetOutstandingRequestsCount gets the number of requests outstanding.
func (r *requestsWaiter) GetOutstandingRequestsCount() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.counter
}
57 changes: 57 additions & 0 deletions go/vt/vttablet/tabletserver/requests_waiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tabletserver

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

// TestRequestWaiter tests the functionality of request waiter.
func TestRequestWaiter(t *testing.T) {
rw := newRequestsWaiter()
require.EqualValues(t, 0, rw.GetWaiterCount())
require.EqualValues(t, 0, rw.GetOutstandingRequestsCount())

rw.Add(3)
require.EqualValues(t, 0, rw.GetWaiterCount())
require.EqualValues(t, 3, rw.GetOutstandingRequestsCount())

rw.Done()
require.EqualValues(t, 0, rw.GetWaiterCount())
require.EqualValues(t, 2, rw.GetOutstandingRequestsCount())

go func() {
rw.WaitToBeEmpty()
}()
go func() {
rw.WaitToBeEmpty()
}()

time.Sleep(100 * time.Millisecond)
require.EqualValues(t, 2, rw.GetWaiterCount())
require.EqualValues(t, 2, rw.GetOutstandingRequestsCount())

rw.Done()
rw.Done()

time.Sleep(100 * time.Millisecond)
require.EqualValues(t, 0, rw.GetWaiterCount())
require.EqualValues(t, 0, rw.GetOutstandingRequestsCount())
}
Loading
Loading