From d751f93de7bec5c19678a67708f4c4b14b0c0eca Mon Sep 17 00:00:00 2001 From: dbsid Date: Thu, 23 Feb 2023 12:05:06 +0800 Subject: [PATCH] sink(ticdc): Implement prep stmt cache in mysql sink (#8140) close pingcap/tiflow#7342 --- cdc/sink/dmlsink/txn/mysql/mysql.go | 52 +++++++++++++++++++-- cdc/sink/dmlsink/txn/mysql/mysql_test.go | 23 +++++++--- go.mod | 1 + go.sum | 1 + pkg/applier/redo_test.go | 3 +- pkg/sink/mysql/config.go | 58 ++++++++++++++++++++++-- pkg/sink/mysql/config_test.go | 18 +++++++- third-party-license.txt | 15 ++++++ 8 files changed, 156 insertions(+), 15 deletions(-) create mode 100644 third-party-license.txt diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index b51a2e36959..8a19178ecd9 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -22,6 +22,7 @@ import ( "time" dmysql "github.com/go-sql-driver/mysql" + lru "github.com/hashicorp/golang-lru" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -65,6 +66,8 @@ type mysqlBackend struct { statistics *metrics.Statistics metricTxnSinkDMLBatchCommit prometheus.Observer metricTxnSinkDMLBatchCallback prometheus.Observer + // implement stmtCache to improve performance, especially when the downstream is TiDB + stmtCache *lru.Cache } // NewMySQLBackends creates a new MySQL sink using schema storage @@ -99,8 +102,28 @@ func NewMySQLBackends( return nil, err } - db.SetMaxIdleConns(cfg.WorkerCount) - db.SetMaxOpenConns(cfg.WorkerCount) + // By default, cache-prep-stmts=true, an LRU cache is used for prepared statements, + // two connections are required to process a transaction. + // The first connection is held in the tx variable, which is used to manage the transaction. + // The second connection is requested through a call to s.db.Prepare + // in case of a cache miss for the statement query. + // The connection pool for CDC is configured with a static size, equal to the number of workers. + // CDC may hang at the "Get Connection" call is due to the limited size of the connection pool. + // When the connection pool is small, + // the chance of all connections being active at the same time increases, + // leading to exhaustion of available connections and a hang at the "Get Connection" call. + // This issue is less likely to occur when the connection pool is larger, + // as there are more connections available for use. + // Adding an extra connection to the connection pool solves the connection exhaustion issue. + db.SetMaxIdleConns(cfg.WorkerCount + 1) + db.SetMaxOpenConns(cfg.WorkerCount + 1) + stmtCache, err := lru.NewWithEvict(cfg.PrepStmtCacheSize, func(key, value interface{}) { + stmt := value.(*sql.Stmt) + stmt.Close() + }) + if err != nil { + return nil, err + } backends := make([]*mysqlBackend, 0, cfg.WorkerCount) for i := 0; i < cfg.WorkerCount; i++ { @@ -114,6 +137,7 @@ func NewMySQLBackends( metricTxnSinkDMLBatchCommit: txn.SinkDMLBatchCommit.WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricTxnSinkDMLBatchCallback: txn.SinkDMLBatchCallback.WithLabelValues(changefeedID.Namespace, changefeedID.ID), + stmtCache: stmtCache, }) } @@ -182,6 +206,9 @@ func (s *mysqlBackend) Flush(ctx context.Context) (err error) { // Close implements interface backend. func (s *mysqlBackend) Close() (err error) { + if s.stmtCache != nil { + s.stmtCache.Purge() + } if s.db != nil { err = s.db.Close() s.db = nil @@ -578,9 +605,26 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare log.Debug("exec row", zap.Int("workerID", s.workerID), zap.String("sql", query), zap.Any("args", args)) ctx, cancelFunc := context.WithTimeout(pctx, writeTimeout) - if _, err := tx.ExecContext(ctx, query, args...); err != nil { + var execError error + if s.cfg.CachePrepStmts { + stmt, ok := s.stmtCache.Get(query) + if !ok { + var err error + stmt, err = s.db.Prepare(query) + if err != nil { + cancelFunc() + return 0, errors.Trace(err) + } + + s.stmtCache.Add(query, stmt) + } + _, execError = tx.Stmt(stmt.(*sql.Stmt)).ExecContext(ctx, args...) + } else { + _, execError = tx.ExecContext(ctx, query, args...) + } + if execError != nil { err := logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, err), + cerror.WrapError(cerror.ErrMySQLTxnError, execError), start, s.changefeed, query, dmls.rowCount, dmls.startTs) if rbErr := tx.Rollback(); rbErr != nil { if errors.Cause(rbErr) != context.Canceled { diff --git a/cdc/sink/dmlsink/txn/mysql/mysql_test.go b/cdc/sink/dmlsink/txn/mysql/mysql_test.go index 9630544bff2..568ed05f04e 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql_test.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql_test.go @@ -288,7 +288,12 @@ func TestNewMySQLBackendExecDML(t *testing.T) { defer cancel() changefeed := "test-changefeed" contextutil.PutChangefeedIDInCtx(ctx, model.DefaultChangeFeedID(changefeed)) - sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1") + // TODO: Need to test txn sink behavior when cache-prep-stmts is true + // I did some attempts to write tests when cache-prep-stmts is true, but failed. + // The reason is that I can't find a way to prepare a statement in sqlmock connection, + // and execute it in another sqlmock connection. + sinkURI, err := url.Parse( + "mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&cache-prep-stmts=false") require.Nil(t, err) sink, err := newMySQLBackend(ctx, sinkURI, config.GetDefaultReplicaConfig(), mockGetDBConn) @@ -411,7 +416,8 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) { defer cancel() changefeed := "test-changefeed" contextutil.PutChangefeedIDInCtx(ctx, model.DefaultChangeFeedID(changefeed)) - sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1") + sinkURI, err := url.Parse( + "mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&cache-prep-stmts=false") require.Nil(t, err) sink, err := newMySQLBackend(ctx, sinkURI, config.GetDefaultReplicaConfig(), mockGetDBConnErrDatabaseNotExists) @@ -483,7 +489,8 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) { defer cancel() changefeed := "test-changefeed" contextutil.PutChangefeedIDInCtx(ctx, model.DefaultChangeFeedID(changefeed)) - sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1") + sinkURI, err := url.Parse( + "mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&cache-prep-stmts=false") require.Nil(t, err) sink, err := newMySQLBackend(ctx, sinkURI, config.GetDefaultReplicaConfig(), mockGetDBConnErrDatabaseNotExists) @@ -557,7 +564,8 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { defer cancel() changefeed := "test-changefeed" contextutil.PutChangefeedIDInCtx(ctx, model.DefaultChangeFeedID(changefeed)) - sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1") + sinkURI, err := url.Parse( + "mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&cache-prep-stmts=false") require.Nil(t, err) sink, err := newMySQLBackend(ctx, sinkURI, config.GetDefaultReplicaConfig(), mockGetDBConnErrDatabaseNotExists) @@ -620,7 +628,9 @@ func TestMysqlSinkNotRetryErrDupEntry(t *testing.T) { defer cancel() changefeed := "test-changefeed" contextutil.PutChangefeedIDInCtx(ctx, model.DefaultChangeFeedID(changefeed)) - sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&safe-mode=false") + sinkURI, err := url.Parse( + "mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&safe-mode=false" + + "&cache-prep-stmts=false") require.Nil(t, err) sink, err := newMySQLBackend(ctx, sinkURI, config.GetDefaultReplicaConfig(), mockDBInsertDupEntry) @@ -794,7 +804,8 @@ func TestMySQLSinkExecDMLError(t *testing.T) { defer cancel() changefeed := "test-changefeed" contextutil.PutChangefeedIDInCtx(ctx, model.DefaultChangeFeedID(changefeed)) - sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1") + sinkURI, err := url.Parse( + "mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&cache-prep-stmts=false") require.Nil(t, err) sink, err := newMySQLBackend(ctx, sinkURI, config.GetDefaultReplicaConfig(), mockGetDBConn) diff --git a/go.mod b/go.mod index 3f268c205cf..d0222438340 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.0 + github.com/hashicorp/golang-lru v0.5.1 github.com/integralist/go-findroot v0.0.0-20160518114804-ac90681525dc github.com/jarcoal/httpmock v1.2.0 github.com/jmoiron/sqlx v1.3.3 diff --git a/go.sum b/go.sum index 0ff079ebf16..5d13bf1487b 100644 --- a/go.sum +++ b/go.sum @@ -576,6 +576,7 @@ github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index c17e11bf7d0..7895bbe613f 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -246,7 +246,8 @@ func TestApplyDMLs(t *testing.T) { close(ddlEventCh) cfg := &RedoApplierConfig{ - SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1&tidb_placement_mode=ignore&safe-mode=true", + SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1" + + "&tidb_placement_mode=ignore&safe-mode=true&cache-prep-stmts=false", } ap := NewRedoApplier(cfg) err := ap.Apply(ctx) diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 9a24817fd1e..ff6422e4ad8 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -72,6 +72,13 @@ const ( BackoffMaxDelay = 60 * time.Second defaultBatchDMLEnable = true + + // defaultcachePrepStmts is the default value of cachePrepStmts + defaultCachePrepStmts = true + // defaultStmtCacheSize is the default size of prepared statement cache + defaultPrepStmtCacheSize = 10000 + // The upper limit of the max size of prepared statement cache + maxPrepStmtCacheSize = 1000000 ) // Config is the configs for MySQL backend. @@ -90,9 +97,11 @@ type Config struct { ForceReplicate bool EnableOldValue bool - IsTiDB bool // IsTiDB is true if the downstream is TiDB - SourceID uint64 - BatchDMLEnable bool + IsTiDB bool // IsTiDB is true if the downstream is TiDB + SourceID uint64 + BatchDMLEnable bool + CachePrepStmts bool + PrepStmtCacheSize int } // NewConfig returns the default mysql backend config. @@ -108,6 +117,8 @@ func NewConfig() *Config { DialTimeout: defaultDialTimeout, SafeMode: defaultSafeMode, BatchDMLEnable: defaultBatchDMLEnable, + CachePrepStmts: defaultCachePrepStmts, + PrepStmtCacheSize: defaultPrepStmtCacheSize, } } @@ -163,6 +174,12 @@ func (c *Config) Apply( if err = getBatchDMLEnable(query, &c.BatchDMLEnable); err != nil { return err } + if err = getCachePrepStmts(query, &c.CachePrepStmts); err != nil { + return err + } + if err = getPrepStmtCacheSize(query, &c.PrepStmtCacheSize); err != nil { + return err + } c.EnableOldValue = replicaConfig.EnableOldValue c.ForceReplicate = replicaConfig.ForceReplicate c.SourceID = replicaConfig.Sink.TiDBSourceID @@ -365,3 +382,38 @@ func getBatchDMLEnable(values url.Values, batchDMLEnable *bool) error { } return nil } + +func getCachePrepStmts(values url.Values, cachePrepStmts *bool) error { + s := values.Get("cache-prep-stmts") + if len(s) > 0 { + enable, err := strconv.ParseBool(s) + if err != nil { + return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } + *cachePrepStmts = enable + } + return nil +} + +func getPrepStmtCacheSize(values url.Values, prepStmtCacheSize *int) error { + s := values.Get("prep-stmt-cache-size") + if len(s) == 0 { + return nil + } + + c, err := strconv.Atoi(s) + if err != nil { + return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } + if c <= 0 { + return cerror.WrapError(cerror.ErrMySQLInvalidConfig, + fmt.Errorf("invalid prep-stmt-cache-size %d, which must be greater than 0", c)) + } + if c > maxPrepStmtCacheSize { + log.Warn("prep-stmt-cache-size too large", + zap.Int("original", c), zap.Int("override", maxPrepStmtCacheSize)) + c = maxPrepStmtCacheSize + } + *prepStmtCacheSize = c + return nil +} diff --git a/pkg/sink/mysql/config_test.go b/pkg/sink/mysql/config_test.go index 71b9d0ec5c7..aa3d9b42e56 100644 --- a/pkg/sink/mysql/config_test.go +++ b/pkg/sink/mysql/config_test.go @@ -187,11 +187,14 @@ func TestApplySinkURIParamsToConfig(t *testing.T) { expected.Timezone = `"UTC"` expected.tidbTxnMode = "pessimistic" expected.EnableOldValue = true + expected.CachePrepStmts = true + expected.PrepStmtCacheSize = 1000000 uriStr := "mysql://127.0.0.1:3306/?worker-count=64&max-txn-row=20" + "&max-multi-update-row=80&max-multi-update-row-size=512" + "&safe-mode=false" + "&tidb-txn-mode=pessimistic" + - "&test-some-deprecated-config=true&test-deprecated-size-config=100" + "&test-some-deprecated-config=true&test-deprecated-size-config=100" + + "&cache-prep-stmts=true&prep-stmt-cache-size=1000000" uri, err := url.Parse(uriStr) require.Nil(t, err) cfg := NewConfig() @@ -257,6 +260,16 @@ func TestParseSinkURIOverride(t *testing.T) { checker: func(sp *Config) { require.EqualValues(t, sp.tidbTxnMode, defaultTiDBTxnMode) }, + }, { + uri: "mysql://127.0.0.1:3306/?prep-stmt-cache-size=100000000", + checker: func(sp *Config) { + require.EqualValues(t, sp.PrepStmtCacheSize, maxPrepStmtCacheSize) + }, + }, { + uri: "mysql://127.0.0.1:3306/?cache-prep-stmts=false", + checker: func(sp *Config) { + require.EqualValues(t, sp.CachePrepStmts, false) + }, }} ctx := context.TODO() var uri *url.URL @@ -295,6 +308,9 @@ func TestParseSinkURIBadQueryString(t *testing.T) { "mysql://127.0.0.1:3306/?write-timeout=badduration", "mysql://127.0.0.1:3306/?read-timeout=badduration", "mysql://127.0.0.1:3306/?timeout=badduration", + "mysql://127.0.0.1:3306/?prep-stmt-cache-size=not-number", + "mysql://127.0.0.1:3306/?prep-stmt-cache-size=-1", + "mysql://127.0.0.1:3306/?prep-stmt-cache-size=0", } ctx := context.TODO() var uri *url.URL diff --git a/third-party-license.txt b/third-party-license.txt new file mode 100644 index 00000000000..fe06f1a57e6 --- /dev/null +++ b/third-party-license.txt @@ -0,0 +1,15 @@ +This project include code from go-sql-driver/mysql project. + +https://github.com/go-sql-driver/mysql + +Homepage: https://github.com/go-sql-driver/mysql +License: Mozilla Public License Version 2.0 https://github.com/go-sql-driver/mysql/blob/master/LICENSE + +-------------------------------------------------------------------------------- + +This project include code from golang-lru project. + +https://github.com/hashicorp/golang-lru + +Homepage: https://github.com/hashicorp/golang-lru +License: Mozilla Public License Version 2.0 https://github.com/hashicorp/golang-lru/blob/master/LICENSE \ No newline at end of file