Skip to content

Commit

Permalink
Snapshot connection: revert to explicit table locks when FTWRL is u…
Browse files Browse the repository at this point in the history
…navailable (#14578)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Nov 23, 2023
1 parent b68b1ed commit 8b7ed8d
Showing 1 changed file with 41 additions and 1 deletion.
42 changes: 41 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/snapshot_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ package vstreamer
import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"
"time"

"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
)

// If the current binary log is greater than this byte size, we
Expand Down Expand Up @@ -241,8 +245,44 @@ func (conn *snapshotConn) startSnapshotAllTables(ctx context.Context) (gtid stri

log.Infof("Locking all tables")
if _, err := lockConn.ExecuteFetch("FLUSH TABLES WITH READ LOCK", 1, false); err != nil {
attemptExplicitTablesLocks := false
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERAccessDeniedError {
// Access denied. On some systems this is either because the user doesn't have SUPER or RELOAD privileges.
// On some other systems, namely RDS, the command is just unsupported.
// There is an alternative way: run a `LOCK TABLES tbl1 READ, tbl2 READ, ...` for all tables. It not as
// efficient, and make a huge query, but still better than nothing.
attemptExplicitTablesLocks = true
}
log.Infof("Error locking all tables")
return "", err
if !attemptExplicitTablesLocks {
return "", err
}
// get list of all tables
rs, err := conn.ExecuteFetch("show full tables", math.MaxInt32, true)
if err != nil {
return "", err
}

var lockClauses []string
for _, row := range rs.Rows {
tableName := row[0].ToString()
tableType := row[1].ToString()
if tableType != "BASE TABLE" {
continue
}
tableName = sqlparser.String(sqlparser.NewIdentifierCS(tableName))
lockClause := fmt.Sprintf("%s read", tableName)
lockClauses = append(lockClauses, lockClause)
}
if len(lockClauses) > 0 {
query := fmt.Sprintf("lock tables %s", strings.Join(lockClauses, ","))
if _, err := lockConn.ExecuteFetch(query, 1, false); err != nil {
log.Error(vterrors.Wrapf(err, "explicitly locking all %v tables", len(lockClauses)))
return "", err
}
} else {
log.Infof("explicit lock tables: no tables found")
}
}
mpos, err := lockConn.PrimaryPosition()
if err != nil {
Expand Down

0 comments on commit 8b7ed8d

Please sign in to comment.