Skip to content

Commit

Permalink
VTOrc optimize TMC usage (vitessio#15356)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Apr 19, 2024
1 parent e60fbd7 commit 8ca5f00
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 78 deletions.
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ Flags:
--stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1)
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ Flags:
--tablet_hostname string if not empty, this hostname will be assumed instead of trying to resolve it
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ Flags:
--tablet_health_keep_alive duration close streaming tablet health connection if there are no requests for this long (default 5m0s)
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Flags:
--table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ Flags:
--tablet_hostname string if not empty, this hostname will be assumed instead of trying to resolve it
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttestserver.txt
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Flags:
--tablet_hostname string The hostname to use for the tablet otherwise it will be derived from OS' hostname (default "localhost")
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
88 changes: 44 additions & 44 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named

var waitGroup sync.WaitGroup
var tablet *topodatapb.Tablet
var fullStatus *replicationdatapb.FullStatus
var fs *replicationdatapb.FullStatus
readingStartTime := time.Now()
instance := NewInstance()
instanceFound := false
Expand Down Expand Up @@ -208,7 +208,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
goto Cleanup
}

fullStatus, err = FullStatus(tabletAlias)
fs, err = fullStatus(tabletAlias)
if err != nil {
goto Cleanup
}
Expand All @@ -218,48 +218,48 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
instance.Port = int(tablet.MysqlPort)
{
// We begin with a few operations we can run concurrently, and which do not depend on anything
instance.ServerID = uint(fullStatus.ServerId)
instance.Version = fullStatus.Version
instance.ReadOnly = fullStatus.ReadOnly
instance.LogBinEnabled = fullStatus.LogBinEnabled
instance.BinlogFormat = fullStatus.BinlogFormat
instance.LogReplicationUpdatesEnabled = fullStatus.LogReplicaUpdates
instance.VersionComment = fullStatus.VersionComment

if instance.LogBinEnabled && fullStatus.PrimaryStatus != nil {
binlogPos, err := getBinlogCoordinatesFromPositionString(fullStatus.PrimaryStatus.FilePosition)
instance.ServerID = uint(fs.ServerId)
instance.Version = fs.Version
instance.ReadOnly = fs.ReadOnly
instance.LogBinEnabled = fs.LogBinEnabled
instance.BinlogFormat = fs.BinlogFormat
instance.LogReplicationUpdatesEnabled = fs.LogReplicaUpdates
instance.VersionComment = fs.VersionComment

if instance.LogBinEnabled && fs.PrimaryStatus != nil {
binlogPos, err := getBinlogCoordinatesFromPositionString(fs.PrimaryStatus.FilePosition)
instance.SelfBinlogCoordinates = binlogPos
errorChan <- err
}

instance.SemiSyncPrimaryEnabled = fullStatus.SemiSyncPrimaryEnabled
instance.SemiSyncReplicaEnabled = fullStatus.SemiSyncReplicaEnabled
instance.SemiSyncPrimaryWaitForReplicaCount = uint(fullStatus.SemiSyncWaitForReplicaCount)
instance.SemiSyncPrimaryTimeout = fullStatus.SemiSyncPrimaryTimeout
instance.SemiSyncPrimaryEnabled = fs.SemiSyncPrimaryEnabled
instance.SemiSyncReplicaEnabled = fs.SemiSyncReplicaEnabled
instance.SemiSyncPrimaryWaitForReplicaCount = uint(fs.SemiSyncWaitForReplicaCount)
instance.SemiSyncPrimaryTimeout = fs.SemiSyncPrimaryTimeout

instance.SemiSyncPrimaryClients = uint(fullStatus.SemiSyncPrimaryClients)
instance.SemiSyncPrimaryStatus = fullStatus.SemiSyncPrimaryStatus
instance.SemiSyncReplicaStatus = fullStatus.SemiSyncReplicaStatus
instance.SemiSyncPrimaryClients = uint(fs.SemiSyncPrimaryClients)
instance.SemiSyncPrimaryStatus = fs.SemiSyncPrimaryStatus
instance.SemiSyncReplicaStatus = fs.SemiSyncReplicaStatus

if instance.IsOracleMySQL() || instance.IsPercona() {
// Stuff only supported on Oracle / Percona MySQL
// ...
// @@gtid_mode only available in Oracle / Percona MySQL >= 5.6
instance.GTIDMode = fullStatus.GtidMode
instance.ServerUUID = fullStatus.ServerUuid
if fullStatus.PrimaryStatus != nil {
GtidExecutedPos, err := replication.DecodePosition(fullStatus.PrimaryStatus.Position)
instance.GTIDMode = fs.GtidMode
instance.ServerUUID = fs.ServerUuid
if fs.PrimaryStatus != nil {
GtidExecutedPos, err := replication.DecodePosition(fs.PrimaryStatus.Position)
errorChan <- err
if err == nil && GtidExecutedPos.GTIDSet != nil {
instance.ExecutedGtidSet = GtidExecutedPos.GTIDSet.String()
}
}
GtidPurgedPos, err := replication.DecodePosition(fullStatus.GtidPurged)
GtidPurgedPos, err := replication.DecodePosition(fs.GtidPurged)
errorChan <- err
if err == nil && GtidPurgedPos.GTIDSet != nil {
instance.GtidPurged = GtidPurgedPos.GTIDSet.String()
}
instance.BinlogRowImage = fullStatus.BinlogRowImage
instance.BinlogRowImage = fs.BinlogRowImage

if instance.GTIDMode != "" && instance.GTIDMode != "OFF" {
instance.SupportsOracleGTID = true
Expand All @@ -269,45 +269,45 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named

instance.ReplicationIOThreadState = ReplicationThreadStateNoThread
instance.ReplicationSQLThreadState = ReplicationThreadStateNoThread
if fullStatus.ReplicationStatus != nil {
instance.HasReplicationCredentials = fullStatus.ReplicationStatus.SourceUser != ""
if fs.ReplicationStatus != nil {
instance.HasReplicationCredentials = fs.ReplicationStatus.SourceUser != ""

instance.ReplicationIOThreadState = ReplicationThreadStateFromReplicationState(replication.ReplicationState(fullStatus.ReplicationStatus.IoState))
instance.ReplicationSQLThreadState = ReplicationThreadStateFromReplicationState(replication.ReplicationState(fullStatus.ReplicationStatus.SqlState))
instance.ReplicationIOThreadState = ReplicationThreadStateFromReplicationState(replication.ReplicationState(fs.ReplicationStatus.IoState))
instance.ReplicationSQLThreadState = ReplicationThreadStateFromReplicationState(replication.ReplicationState(fs.ReplicationStatus.SqlState))
instance.ReplicationIOThreadRuning = instance.ReplicationIOThreadState.IsRunning()
instance.ReplicationSQLThreadRuning = instance.ReplicationSQLThreadState.IsRunning()

binlogPos, err := getBinlogCoordinatesFromPositionString(fullStatus.ReplicationStatus.RelayLogSourceBinlogEquivalentPosition)
binlogPos, err := getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.RelayLogSourceBinlogEquivalentPosition)
instance.ReadBinlogCoordinates = binlogPos
errorChan <- err

binlogPos, err = getBinlogCoordinatesFromPositionString(fullStatus.ReplicationStatus.FilePosition)
binlogPos, err = getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.FilePosition)
instance.ExecBinlogCoordinates = binlogPos
errorChan <- err
instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates()

binlogPos, err = getBinlogCoordinatesFromPositionString(fullStatus.ReplicationStatus.RelayLogFilePosition)
binlogPos, err = getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.RelayLogFilePosition)
instance.RelaylogCoordinates = binlogPos
instance.RelaylogCoordinates.Type = RelayLog
errorChan <- err

instance.LastSQLError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(fullStatus.ReplicationStatus.LastSqlError), "")
instance.LastIOError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(fullStatus.ReplicationStatus.LastIoError), "")
instance.LastSQLError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(fs.ReplicationStatus.LastSqlError), "")
instance.LastIOError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(fs.ReplicationStatus.LastIoError), "")

instance.SQLDelay = fullStatus.ReplicationStatus.SqlDelay
instance.UsingOracleGTID = fullStatus.ReplicationStatus.AutoPosition
instance.UsingMariaDBGTID = fullStatus.ReplicationStatus.UsingGtid
instance.SourceUUID = fullStatus.ReplicationStatus.SourceUuid
instance.HasReplicationFilters = fullStatus.ReplicationStatus.HasReplicationFilters
instance.SQLDelay = fs.ReplicationStatus.SqlDelay
instance.UsingOracleGTID = fs.ReplicationStatus.AutoPosition
instance.UsingMariaDBGTID = fs.ReplicationStatus.UsingGtid
instance.SourceUUID = fs.ReplicationStatus.SourceUuid
instance.HasReplicationFilters = fs.ReplicationStatus.HasReplicationFilters

instance.SourceHost = fullStatus.ReplicationStatus.SourceHost
instance.SourcePort = int(fullStatus.ReplicationStatus.SourcePort)
instance.SourceHost = fs.ReplicationStatus.SourceHost
instance.SourcePort = int(fs.ReplicationStatus.SourcePort)

if fullStatus.ReplicationStatus.ReplicationLagUnknown {
if fs.ReplicationStatus.ReplicationLagUnknown {
instance.SecondsBehindPrimary.Valid = false
} else {
instance.SecondsBehindPrimary.Valid = true
instance.SecondsBehindPrimary.Int64 = int64(fullStatus.ReplicationStatus.ReplicationLagSeconds)
instance.SecondsBehindPrimary.Int64 = int64(fs.ReplicationStatus.ReplicationLagSeconds)
}
if instance.SecondsBehindPrimary.Valid && instance.SecondsBehindPrimary.Int64 < 0 {
log.Warningf("Alias: %+v, instance.SecondsBehindPrimary < 0 [%+v], correcting to 0", tabletAlias, instance.SecondsBehindPrimary.Int64)
Expand All @@ -316,7 +316,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
// And until told otherwise:
instance.ReplicationLagSeconds = instance.SecondsBehindPrimary

instance.AllowTLS = fullStatus.ReplicationStatus.SslAllowed
instance.AllowTLS = fs.ReplicationStatus.SslAllowed
}

instanceFound = true
Expand Down
23 changes: 7 additions & 16 deletions go/vt/vtorc/inst/tablet_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,20 @@ import (

// ErrTabletAliasNil is a fixed error message.
var ErrTabletAliasNil = errors.New("tablet alias is nil")
var tmc tmclient.TabletManagerClient

// ResetReplicationParameters resets the replication parameters on the given tablet.
func ResetReplicationParameters(tabletAlias string) error {
tablet, err := ReadTablet(tabletAlias)
if err != nil {
return err
}
tmc := tmclient.NewTabletManagerClient()
tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer tmcCancel()
if err := tmc.ResetReplicationParameters(tmcCtx, tablet); err != nil {
return err
}
return nil
// InitializeTMC initializes the tablet manager client to use for all VTOrc RPC calls.
func InitializeTMC() tmclient.TabletManagerClient {
tmc = tmclient.NewTabletManagerClient()
return tmc
}

// FullStatus gets the full status of the MySQL running in vttablet.
func FullStatus(tabletAlias string) (*replicationdatapb.FullStatus, error) {
// fullStatus gets the full status of the MySQL running in vttablet.
func fullStatus(tabletAlias string) (*replicationdatapb.FullStatus, error) {
tablet, err := ReadTablet(tabletAlias)
if err != nil {
return nil, err
}
tmc := tmclient.NewTabletManagerClient()
tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer tmcCancel()
return tmc.FullStatus(tmcCtx, tablet)
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func RegisterFlags(fs *pflag.FlagSet) {
func OpenTabletDiscovery() <-chan time.Time {
// TODO(sougou): If there's a shutdown signal, we have to close the topo.
ts = topo.Open()
tmc = tmclient.NewTabletManagerClient()
tmc = inst.InitializeTMC()
// Clear existing cache and perform a new refresh.
if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil {
log.Error(err)
Expand Down Expand Up @@ -293,6 +293,11 @@ func changeTabletType(ctx context.Context, tablet *topodatapb.Tablet, tabletType
return tmc.ChangeType(ctx, tablet, tabletType, semiSync)
}

// resetReplicationParameters resets the replication parameters on the given tablet.
func resetReplicationParameters(ctx context.Context, tablet *topodatapb.Tablet) error {
return tmc.ResetReplicationParameters(ctx, tablet)
}

// setReplicationSource calls the said RPC with the parameters provided
func setReplicationSource(ctx context.Context, replica *topodatapb.Tablet, primary *topodatapb.Tablet, semiSync bool) error {
return tmc.SetReplicationSource(ctx, replica, primary.Alias, 0, "", true, semiSync)
Expand Down
Loading

0 comments on commit 8ca5f00

Please sign in to comment.