Skip to content

Commit

Permalink
Use replica queries when available (#15808)
Browse files Browse the repository at this point in the history
Signed-off-by: Dirkjan Bussink <[email protected]>
  • Loading branch information
dbussink authored Apr 30, 2024
1 parent 7ba6f5b commit ba0dfa4
Show file tree
Hide file tree
Showing 47 changed files with 1,105 additions and 681 deletions.
4 changes: 2 additions & 2 deletions examples/compose/fix_replication.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ function get_replication_status() {
function reset_replication() {
# Necessary before sql file can be imported
echo "Importing MysqlDump: $KEYSPACE.sql"
mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "RESET MASTER;STOP SLAVE;CHANGE MASTER TO MASTER_AUTO_POSITION = 0;source $KEYSPACE.sql;START SLAVE;"
mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "RESET MASTER;STOP REPLICA;CHANGE REPLICATION SOURCE TO SOURCE_AUTO_POSITION = 0;source $KEYSPACE.sql;START REPLICA;"
# Restore Master Auto Position
echo "Restoring Master Auto Setting"
mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "STOP SLAVE;CHANGE MASTER TO MASTER_AUTO_POSITION = 1;START SLAVE;"
mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "STOP REPLICA;CHANGE REPLICATION SOURCE TO SOURCE_AUTO_POSITION = 1;START REPLICA;"
}

# Retrieve replication status
Expand Down
15 changes: 7 additions & 8 deletions go/cmd/vtbackup/cli/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
return fmt.Errorf("can't reset replication: %v", err)
}
// We need to switch off super_read_only before we create the database.
resetFunc, err := mysqld.SetSuperReadOnly(false)
resetFunc, err := mysqld.SetSuperReadOnly(ctx, false)
if err != nil {
return fmt.Errorf("failed to disable super_read_only during backup: %v", err)
}
Expand Down Expand Up @@ -528,7 +528,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
}

lastStatus = status
status, statusErr = mysqld.ReplicationStatus()
status, statusErr = mysqld.ReplicationStatus(ctx)
if statusErr != nil {
log.Warningf("Error getting replication status: %v", statusErr)
continue
Expand Down Expand Up @@ -560,12 +560,12 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
phase.Set(phaseNameCatchupReplication, int64(0))

// Stop replication and see where we are.
if err := mysqld.StopReplication(nil); err != nil {
if err := mysqld.StopReplication(ctx, nil); err != nil {
return fmt.Errorf("can't stop replication: %v", err)
}

// Did we make any progress?
status, statusErr = mysqld.ReplicationStatus()
status, statusErr = mysqld.ReplicationStatus(ctx)
if statusErr != nil {
return fmt.Errorf("can't get replication status: %v", err)
}
Expand Down Expand Up @@ -621,11 +621,10 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
}

func resetReplication(ctx context.Context, pos replication.Position, mysqld mysqlctl.MysqlDaemon) error {
cmds := []string{
"STOP SLAVE",
"RESET SLAVE ALL", // "ALL" makes it forget replication source host:port.
if err := mysqld.StopReplication(ctx, nil); err != nil {
return vterrors.Wrap(err, "failed to stop replication")
}
if err := mysqld.ExecuteSuperQueryList(ctx, cmds); err != nil {
if err := mysqld.ResetReplicationParameters(ctx); err != nil {
return vterrors.Wrap(err, "failed to reset replication")
}

Expand Down
8 changes: 4 additions & 4 deletions go/cmd/vtcombo/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
mysqld.Shutdown(ctx, cnf, true, mysqlctl.DefaultShutdownTimeout)
})
// We want to ensure we can write to this database
mysqld.SetReadOnly(false)
mysqld.SetReadOnly(cmd.Context(), false)

} else {
dbconfigs.GlobalDBConfigs.InitWithSocket("", env.CollationEnv())
Expand Down Expand Up @@ -368,12 +368,12 @@ func (mysqld *vtcomboMysqld) SetReplicationSource(ctx context.Context, host stri
}

// StartReplication implements the MysqlDaemon interface
func (mysqld *vtcomboMysqld) StartReplication(hookExtraEnv map[string]string) error {
func (mysqld *vtcomboMysqld) StartReplication(ctx context.Context, hookExtraEnv map[string]string) error {
return nil
}

// RestartReplication implements the MysqlDaemon interface
func (mysqld *vtcomboMysqld) RestartReplication(hookExtraEnv map[string]string) error {
func (mysqld *vtcomboMysqld) RestartReplication(ctx context.Context, hookExtraEnv map[string]string) error {
return nil
}

Expand All @@ -383,7 +383,7 @@ func (mysqld *vtcomboMysqld) StartReplicationUntilAfter(ctx context.Context, pos
}

// StopReplication implements the MysqlDaemon interface
func (mysqld *vtcomboMysqld) StopReplication(hookExtraEnv map[string]string) error {
func (mysqld *vtcomboMysqld) StopReplication(ctx context.Context, hookExtraEnv map[string]string) error {
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions go/mysql/capabilities/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
CheckConstraintsCapability // supported in MySQL 8.0.16 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-16.html
PerformanceSchemaDataLocksTableCapability // supported in MySQL 8.0.1 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-1.html
InstantDDLXtrabackupCapability // Supported in 8.0.32 and above, solving a MySQL-vs-Xtrabackup bug starting 8.0.29
ReplicaTerminologyCapability // Supported in 8.0.26 and above, using SHOW REPLICA STATUS and all variations.
)

type CapableOf func(capability FlavorCapability) (bool, error)
Expand Down Expand Up @@ -112,6 +113,12 @@ func MySQLVersionHasCapability(serverVersion string, capability FlavorCapability
return atLeast(8, 0, 30)
case InstantDDLXtrabackupCapability:
return atLeast(8, 0, 32)
case ReplicaTerminologyCapability:
// In MySQL 8.0.22 the new replica syntax was introduced, but other changes
// like the log_replica_updates field was only present in 8.0.26 and newer.
// So be conservative here, and only use the new syntax on newer versions,
// so we don't have to have too many different flavors.
return atLeast(8, 0, 26)
default:
return false, nil
}
Expand Down
71 changes: 33 additions & 38 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,16 @@ const (
mariaDBReplicationHackPrefix = "5.5.5-"
// mariaDBVersionString is present in
mariaDBVersionString = "MariaDB"
// mysql57VersionPrefix is the prefix for 5.7 mysql version, such as 5.7.31-log
mysql57VersionPrefix = "5.7."
// mysql80VersionPrefix is the prefix for 8.0 mysql version, such as 8.0.19
mysql80VersionPrefix = "8.0."
// mysql8VersionPrefix is the prefix for 8.x mysql version, such as 8.0.19,
// but also newer ones like 8.4.0.
mysql8VersionPrefix = "8."
)

// flavor is the abstract interface for a flavor.
// Flavors are auto-detected upon connection using the server version.
// We have two major implementations (the main difference is the GTID
// handling):
// 1. Oracle MySQL 5.6, 5.7, 8.0, ...
// 1. Oracle MySQL 5.7, 8.0, ...
// 2. MariaDB 10.X
type flavor interface {
// primaryGTIDSet returns the current GTIDSet of a server.
Expand Down Expand Up @@ -88,6 +87,9 @@ type flavor interface {
// stopReplicationCommand returns the command to stop the replication.
stopReplicationCommand() string

// resetReplicationCommand returns the command to reset the replication.
resetReplicationCommand() string

// stopIOThreadCommand returns the command to stop the replica's IO thread only.
stopIOThreadCommand() string

Expand Down Expand Up @@ -116,9 +118,9 @@ type flavor interface {
// replication position at which the replica will resume.
setReplicationPositionCommands(pos replication.Position) []string

// changeReplicationSourceArg returns the specific parameter to add to
// a "change primary" command.
changeReplicationSourceArg() string
// setReplicationSourceCommand returns the command to use the provided host/port
// as the new replication source (without changing any GTID position).
setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string

// status returns the result of the appropriate status command,
// with parsed replication position.
Expand All @@ -132,6 +134,11 @@ type flavor interface {
// until the context expires. It returns an error if we did not
// succeed.
waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error
// catchupToGTIDCommands returns the command to catch up to a given GTID.
catchupToGTIDCommands(params *ConnParams, pos replication.Position) []string

// binlogReplicatedUpdates returns the field to use to check replica updates.
binlogReplicatedUpdates() string

baseShowTables() string
baseShowTablesWithSizes() string
Expand Down Expand Up @@ -171,13 +178,16 @@ func GetFlavor(serverVersion string, flavorFunc func() flavor) (f flavor, capabl
} else {
f = mariadbFlavor102{mariadbFlavor{serverVersion: fmt.Sprintf("%f", mariadbVersion)}}
}
case strings.HasPrefix(serverVersion, mysql57VersionPrefix):
f = mysqlFlavor57{mysqlFlavor{serverVersion: serverVersion}}
case strings.HasPrefix(serverVersion, mysql80VersionPrefix):
f = mysqlFlavor80{mysqlFlavor{serverVersion: serverVersion}}
case strings.HasPrefix(serverVersion, mysql8VersionPrefix):
recent, _ := capabilities.MySQLVersionHasCapability(serverVersion, capabilities.ReplicaTerminologyCapability)
if recent {
f = mysqlFlavor8{mysqlFlavor{serverVersion: serverVersion}}
} else {
f = mysqlFlavor8Legacy{mysqlFlavor{serverVersion: serverVersion}}
}
default:
// If unknown, return the most basic flavor: MySQL 56.
f = mysqlFlavor56{mysqlFlavor{serverVersion: serverVersion}}
// If unknown, return the most basic flavor: MySQL 57.
f = mysqlFlavor57{mysqlFlavor{serverVersion: serverVersion}}
}
return f, f.supportsCapability, canonicalVersion
}
Expand Down Expand Up @@ -299,6 +309,10 @@ func (c *Conn) StopReplicationCommand() string {
return c.flavor.stopReplicationCommand()
}

func (c *Conn) ResetReplicationCommand() string {
return c.flavor.resetReplicationCommand()
}

// StopIOThreadCommand returns the command to stop the replica's io thread.
func (c *Conn) StopIOThreadCommand() string {
return c.flavor.stopIOThreadCommand()
Expand Down Expand Up @@ -351,30 +365,7 @@ func (c *Conn) SetReplicationPositionCommands(pos replication.Position) []string
// It is guaranteed to be called with replication stopped.
// It should not start or stop replication.
func (c *Conn) SetReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
args := []string{
fmt.Sprintf("MASTER_HOST = '%s'", host),
fmt.Sprintf("MASTER_PORT = %d", port),
fmt.Sprintf("MASTER_USER = '%s'", params.Uname),
fmt.Sprintf("MASTER_PASSWORD = '%s'", params.Pass),
fmt.Sprintf("MASTER_CONNECT_RETRY = %d", connectRetry),
}
if params.SslEnabled() {
args = append(args, "MASTER_SSL = 1")
}
if params.SslCa != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_CA = '%s'", params.SslCa))
}
if params.SslCaPath != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_CAPATH = '%s'", params.SslCaPath))
}
if params.SslCert != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_CERT = '%s'", params.SslCert))
}
if params.SslKey != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_KEY = '%s'", params.SslKey))
}
args = append(args, c.flavor.changeReplicationSourceArg())
return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ")
return c.flavor.setReplicationSourceCommand(params, host, port, connectRetry)
}

// resultToMap is a helper function used by ShowReplicationStatus.
Expand Down Expand Up @@ -415,6 +406,10 @@ func (c *Conn) WaitUntilPosition(ctx context.Context, pos replication.Position)
return c.flavor.waitUntilPosition(ctx, c, pos)
}

func (c *Conn) CatchupToGTIDCommands(params *ConnParams, pos replication.Position) []string {
return c.flavor.catchupToGTIDCommands(params, pos)
}

// WaitUntilFilePosition waits until the given position is reached or until
// the context expires for the file position flavor. It returns an error if
// we did not succeed.
Expand Down
16 changes: 14 additions & 2 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (flv *filePosFlavor) startReplicationCommand() string {
return "unsupported"
}

func (flv *filePosFlavor) resetReplicationCommand() string {
return "unsupported"
}

func (flv *filePosFlavor) restartReplicationCommands() []string {
return []string{"unsupported"}
}
Expand Down Expand Up @@ -223,8 +227,8 @@ func (flv *filePosFlavor) setReplicationPositionCommands(pos replication.Positio
}
}

// setReplicationPositionCommands is part of the Flavor interface.
func (flv *filePosFlavor) changeReplicationSourceArg() string {
// setReplicationSourceCommand is part of the Flavor interface.
func (flv *filePosFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
return "unsupported"
}

Expand Down Expand Up @@ -342,3 +346,11 @@ func (*filePosFlavor) supportsCapability(capability capabilities.FlavorCapabilit
return false, nil
}
}

func (*filePosFlavor) catchupToGTIDCommands(_ *ConnParams, _ replication.Position) []string {
return []string{"unsupported"}
}

func (*filePosFlavor) binlogReplicatedUpdates() string {
return "@@global.log_slave_updates"
}
41 changes: 38 additions & 3 deletions go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"io"
"strings"
"time"

"vitess.io/vitess/go/mysql/capabilities"
Expand Down Expand Up @@ -97,6 +98,10 @@ func (mariadbFlavor) stopReplicationCommand() string {
return "STOP SLAVE"
}

func (mariadbFlavor) resetReplicationCommand() string {
return "RESET SLAVE ALL"
}

func (mariadbFlavor) stopIOThreadCommand() string {
return "STOP SLAVE IO_THREAD"
}
Expand Down Expand Up @@ -182,9 +187,31 @@ func (mariadbFlavor) setReplicationPositionCommands(pos replication.Position) []
}
}

// setReplicationPositionCommands is part of the Flavor interface.
func (mariadbFlavor) changeReplicationSourceArg() string {
return "MASTER_USE_GTID = current_pos"
func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
args := []string{
fmt.Sprintf("MASTER_HOST = '%s'", host),
fmt.Sprintf("MASTER_PORT = %d", port),
fmt.Sprintf("MASTER_USER = '%s'", params.Uname),
fmt.Sprintf("MASTER_PASSWORD = '%s'", params.Pass),
fmt.Sprintf("MASTER_CONNECT_RETRY = %d", connectRetry),
}
if params.SslEnabled() {
args = append(args, "MASTER_SSL = 1")
}
if params.SslCa != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_CA = '%s'", params.SslCa))
}
if params.SslCaPath != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_CAPATH = '%s'", params.SslCaPath))
}
if params.SslCert != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_CERT = '%s'", params.SslCert))
}
if params.SslKey != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_KEY = '%s'", params.SslKey))
}
args = append(args, "MASTER_USE_GTID = current_pos")
return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ")
}

// status is part of the Flavor interface.
Expand Down Expand Up @@ -296,3 +323,11 @@ func (mariadbFlavor) supportsCapability(capability capabilities.FlavorCapability
return false, nil
}
}

func (mariadbFlavor) catchupToGTIDCommands(_ *ConnParams, _ replication.Position) []string {
return []string{"unsupported"}
}

func (mariadbFlavor) binlogReplicatedUpdates() string {
return "@@global.log_slave_updates"
}
Loading

0 comments on commit ba0dfa4

Please sign in to comment.