Skip to content

Commit

Permalink
sink(ticdc): Implement prep stmt cache in mysql sink (#8140)
Browse files Browse the repository at this point in the history
close #7342
  • Loading branch information
dbsid authored Feb 23, 2023
1 parent 7427143 commit d751f93
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 15 deletions.
52 changes: 48 additions & 4 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

dmysql "github.com/go-sql-driver/mysql"
lru "github.com/hashicorp/golang-lru"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
Expand Down Expand Up @@ -65,6 +66,8 @@ type mysqlBackend struct {
statistics *metrics.Statistics
metricTxnSinkDMLBatchCommit prometheus.Observer
metricTxnSinkDMLBatchCallback prometheus.Observer
// implement stmtCache to improve performance, especially when the downstream is TiDB
stmtCache *lru.Cache
}

// NewMySQLBackends creates a new MySQL sink using schema storage
Expand Down Expand Up @@ -99,8 +102,28 @@ func NewMySQLBackends(
return nil, err
}

db.SetMaxIdleConns(cfg.WorkerCount)
db.SetMaxOpenConns(cfg.WorkerCount)
// By default, cache-prep-stmts=true, an LRU cache is used for prepared statements,
// two connections are required to process a transaction.
// The first connection is held in the tx variable, which is used to manage the transaction.
// The second connection is requested through a call to s.db.Prepare
// in case of a cache miss for the statement query.
// The connection pool for CDC is configured with a static size, equal to the number of workers.
// CDC may hang at the "Get Connection" call is due to the limited size of the connection pool.
// When the connection pool is small,
// the chance of all connections being active at the same time increases,
// leading to exhaustion of available connections and a hang at the "Get Connection" call.
// This issue is less likely to occur when the connection pool is larger,
// as there are more connections available for use.
// Adding an extra connection to the connection pool solves the connection exhaustion issue.
db.SetMaxIdleConns(cfg.WorkerCount + 1)
db.SetMaxOpenConns(cfg.WorkerCount + 1)
stmtCache, err := lru.NewWithEvict(cfg.PrepStmtCacheSize, func(key, value interface{}) {
stmt := value.(*sql.Stmt)
stmt.Close()
})
if err != nil {
return nil, err
}

backends := make([]*mysqlBackend, 0, cfg.WorkerCount)
for i := 0; i < cfg.WorkerCount; i++ {
Expand All @@ -114,6 +137,7 @@ func NewMySQLBackends(

metricTxnSinkDMLBatchCommit: txn.SinkDMLBatchCommit.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnSinkDMLBatchCallback: txn.SinkDMLBatchCallback.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
stmtCache: stmtCache,
})
}

Expand Down Expand Up @@ -182,6 +206,9 @@ func (s *mysqlBackend) Flush(ctx context.Context) (err error) {

// Close implements interface backend.
func (s *mysqlBackend) Close() (err error) {
if s.stmtCache != nil {
s.stmtCache.Purge()
}
if s.db != nil {
err = s.db.Close()
s.db = nil
Expand Down Expand Up @@ -578,9 +605,26 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
log.Debug("exec row", zap.Int("workerID", s.workerID),
zap.String("sql", query), zap.Any("args", args))
ctx, cancelFunc := context.WithTimeout(pctx, writeTimeout)
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
var execError error
if s.cfg.CachePrepStmts {
stmt, ok := s.stmtCache.Get(query)
if !ok {
var err error
stmt, err = s.db.Prepare(query)
if err != nil {
cancelFunc()
return 0, errors.Trace(err)
}

s.stmtCache.Add(query, stmt)
}
_, execError = tx.Stmt(stmt.(*sql.Stmt)).ExecContext(ctx, args...)
} else {
_, execError = tx.ExecContext(ctx, query, args...)
}
if execError != nil {
err := logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
cerror.WrapError(cerror.ErrMySQLTxnError, execError),
start, s.changefeed, query, dmls.rowCount, dmls.startTs)
if rbErr := tx.Rollback(); rbErr != nil {
if errors.Cause(rbErr) != context.Canceled {
Expand Down
23 changes: 17 additions & 6 deletions cdc/sink/dmlsink/txn/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,12 @@ func TestNewMySQLBackendExecDML(t *testing.T) {
defer cancel()
changefeed := "test-changefeed"
contextutil.PutChangefeedIDInCtx(ctx, model.DefaultChangeFeedID(changefeed))
sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1")
// TODO: Need to test txn sink behavior when cache-prep-stmts is true
// I did some attempts to write tests when cache-prep-stmts is true, but failed.
// The reason is that I can't find a way to prepare a statement in sqlmock connection,
// and execute it in another sqlmock connection.
sinkURI, err := url.Parse(
"mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&cache-prep-stmts=false")
require.Nil(t, err)
sink, err := newMySQLBackend(ctx, sinkURI,
config.GetDefaultReplicaConfig(), mockGetDBConn)
Expand Down Expand Up @@ -411,7 +416,8 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) {
defer cancel()
changefeed := "test-changefeed"
contextutil.PutChangefeedIDInCtx(ctx, model.DefaultChangeFeedID(changefeed))
sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1")
sinkURI, err := url.Parse(
"mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&cache-prep-stmts=false")
require.Nil(t, err)
sink, err := newMySQLBackend(ctx, sinkURI,
config.GetDefaultReplicaConfig(), mockGetDBConnErrDatabaseNotExists)
Expand Down Expand Up @@ -483,7 +489,8 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) {
defer cancel()
changefeed := "test-changefeed"
contextutil.PutChangefeedIDInCtx(ctx, model.DefaultChangeFeedID(changefeed))
sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1")
sinkURI, err := url.Parse(
"mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&cache-prep-stmts=false")
require.Nil(t, err)
sink, err := newMySQLBackend(ctx, sinkURI,
config.GetDefaultReplicaConfig(), mockGetDBConnErrDatabaseNotExists)
Expand Down Expand Up @@ -557,7 +564,8 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) {
defer cancel()
changefeed := "test-changefeed"
contextutil.PutChangefeedIDInCtx(ctx, model.DefaultChangeFeedID(changefeed))
sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1")
sinkURI, err := url.Parse(
"mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&cache-prep-stmts=false")
require.Nil(t, err)
sink, err := newMySQLBackend(ctx, sinkURI,
config.GetDefaultReplicaConfig(), mockGetDBConnErrDatabaseNotExists)
Expand Down Expand Up @@ -620,7 +628,9 @@ func TestMysqlSinkNotRetryErrDupEntry(t *testing.T) {
defer cancel()
changefeed := "test-changefeed"
contextutil.PutChangefeedIDInCtx(ctx, model.DefaultChangeFeedID(changefeed))
sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&safe-mode=false")
sinkURI, err := url.Parse(
"mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&safe-mode=false" +
"&cache-prep-stmts=false")
require.Nil(t, err)
sink, err := newMySQLBackend(ctx, sinkURI,
config.GetDefaultReplicaConfig(), mockDBInsertDupEntry)
Expand Down Expand Up @@ -794,7 +804,8 @@ func TestMySQLSinkExecDMLError(t *testing.T) {
defer cancel()
changefeed := "test-changefeed"
contextutil.PutChangefeedIDInCtx(ctx, model.DefaultChangeFeedID(changefeed))
sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1")
sinkURI, err := url.Parse(
"mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&cache-prep-stmts=false")
require.Nil(t, err)
sink, err := newMySQLBackend(ctx, sinkURI,
config.GetDefaultReplicaConfig(), mockGetDBConn)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.0
github.com/hashicorp/golang-lru v0.5.1
github.com/integralist/go-findroot v0.0.0-20160518114804-ac90681525dc
github.com/jarcoal/httpmock v1.2.0
github.com/jmoiron/sqlx v1.3.3
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
Expand Down
3 changes: 2 additions & 1 deletion pkg/applier/redo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ func TestApplyDMLs(t *testing.T) {
close(ddlEventCh)

cfg := &RedoApplierConfig{
SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1&tidb_placement_mode=ignore&safe-mode=true",
SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1" +
"&tidb_placement_mode=ignore&safe-mode=true&cache-prep-stmts=false",
}
ap := NewRedoApplier(cfg)
err := ap.Apply(ctx)
Expand Down
58 changes: 55 additions & 3 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ const (
BackoffMaxDelay = 60 * time.Second

defaultBatchDMLEnable = true

// defaultcachePrepStmts is the default value of cachePrepStmts
defaultCachePrepStmts = true
// defaultStmtCacheSize is the default size of prepared statement cache
defaultPrepStmtCacheSize = 10000
// The upper limit of the max size of prepared statement cache
maxPrepStmtCacheSize = 1000000
)

// Config is the configs for MySQL backend.
Expand All @@ -90,9 +97,11 @@ type Config struct {
ForceReplicate bool
EnableOldValue bool

IsTiDB bool // IsTiDB is true if the downstream is TiDB
SourceID uint64
BatchDMLEnable bool
IsTiDB bool // IsTiDB is true if the downstream is TiDB
SourceID uint64
BatchDMLEnable bool
CachePrepStmts bool
PrepStmtCacheSize int
}

// NewConfig returns the default mysql backend config.
Expand All @@ -108,6 +117,8 @@ func NewConfig() *Config {
DialTimeout: defaultDialTimeout,
SafeMode: defaultSafeMode,
BatchDMLEnable: defaultBatchDMLEnable,
CachePrepStmts: defaultCachePrepStmts,
PrepStmtCacheSize: defaultPrepStmtCacheSize,
}
}

Expand Down Expand Up @@ -163,6 +174,12 @@ func (c *Config) Apply(
if err = getBatchDMLEnable(query, &c.BatchDMLEnable); err != nil {
return err
}
if err = getCachePrepStmts(query, &c.CachePrepStmts); err != nil {
return err
}
if err = getPrepStmtCacheSize(query, &c.PrepStmtCacheSize); err != nil {
return err
}
c.EnableOldValue = replicaConfig.EnableOldValue
c.ForceReplicate = replicaConfig.ForceReplicate
c.SourceID = replicaConfig.Sink.TiDBSourceID
Expand Down Expand Up @@ -365,3 +382,38 @@ func getBatchDMLEnable(values url.Values, batchDMLEnable *bool) error {
}
return nil
}

func getCachePrepStmts(values url.Values, cachePrepStmts *bool) error {
s := values.Get("cache-prep-stmts")
if len(s) > 0 {
enable, err := strconv.ParseBool(s)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
*cachePrepStmts = enable
}
return nil
}

func getPrepStmtCacheSize(values url.Values, prepStmtCacheSize *int) error {
s := values.Get("prep-stmt-cache-size")
if len(s) == 0 {
return nil
}

c, err := strconv.Atoi(s)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
if c <= 0 {
return cerror.WrapError(cerror.ErrMySQLInvalidConfig,
fmt.Errorf("invalid prep-stmt-cache-size %d, which must be greater than 0", c))
}
if c > maxPrepStmtCacheSize {
log.Warn("prep-stmt-cache-size too large",
zap.Int("original", c), zap.Int("override", maxPrepStmtCacheSize))
c = maxPrepStmtCacheSize
}
*prepStmtCacheSize = c
return nil
}
18 changes: 17 additions & 1 deletion pkg/sink/mysql/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,14 @@ func TestApplySinkURIParamsToConfig(t *testing.T) {
expected.Timezone = `"UTC"`
expected.tidbTxnMode = "pessimistic"
expected.EnableOldValue = true
expected.CachePrepStmts = true
expected.PrepStmtCacheSize = 1000000
uriStr := "mysql://127.0.0.1:3306/?worker-count=64&max-txn-row=20" +
"&max-multi-update-row=80&max-multi-update-row-size=512" +
"&safe-mode=false" +
"&tidb-txn-mode=pessimistic" +
"&test-some-deprecated-config=true&test-deprecated-size-config=100"
"&test-some-deprecated-config=true&test-deprecated-size-config=100" +
"&cache-prep-stmts=true&prep-stmt-cache-size=1000000"
uri, err := url.Parse(uriStr)
require.Nil(t, err)
cfg := NewConfig()
Expand Down Expand Up @@ -257,6 +260,16 @@ func TestParseSinkURIOverride(t *testing.T) {
checker: func(sp *Config) {
require.EqualValues(t, sp.tidbTxnMode, defaultTiDBTxnMode)
},
}, {
uri: "mysql://127.0.0.1:3306/?prep-stmt-cache-size=100000000",
checker: func(sp *Config) {
require.EqualValues(t, sp.PrepStmtCacheSize, maxPrepStmtCacheSize)
},
}, {
uri: "mysql://127.0.0.1:3306/?cache-prep-stmts=false",
checker: func(sp *Config) {
require.EqualValues(t, sp.CachePrepStmts, false)
},
}}
ctx := context.TODO()
var uri *url.URL
Expand Down Expand Up @@ -295,6 +308,9 @@ func TestParseSinkURIBadQueryString(t *testing.T) {
"mysql://127.0.0.1:3306/?write-timeout=badduration",
"mysql://127.0.0.1:3306/?read-timeout=badduration",
"mysql://127.0.0.1:3306/?timeout=badduration",
"mysql://127.0.0.1:3306/?prep-stmt-cache-size=not-number",
"mysql://127.0.0.1:3306/?prep-stmt-cache-size=-1",
"mysql://127.0.0.1:3306/?prep-stmt-cache-size=0",
}
ctx := context.TODO()
var uri *url.URL
Expand Down
15 changes: 15 additions & 0 deletions third-party-license.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
This project include code from go-sql-driver/mysql project.

https://github.com/go-sql-driver/mysql

Homepage: https://github.com/go-sql-driver/mysql
License: Mozilla Public License Version 2.0 https://github.com/go-sql-driver/mysql/blob/master/LICENSE

--------------------------------------------------------------------------------

This project include code from golang-lru project.

https://github.com/hashicorp/golang-lru

Homepage: https://github.com/hashicorp/golang-lru
License: Mozilla Public License Version 2.0 https://github.com/hashicorp/golang-lru/blob/master/LICENSE

0 comments on commit d751f93

Please sign in to comment.