Skip to content

Commit

Permalink
session: support non-transactional insert statements (pingcap#38799)
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 authored Nov 3, 2022
1 parent 74ac033 commit d7bd49a
Show file tree
Hide file tree
Showing 10 changed files with 4,038 additions and 3,583 deletions.
4 changes: 2 additions & 2 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func RegisterMetrics() {
prometheus.MustRegister(CPUProfileCounter)
prometheus.MustRegister(ReadFromTableCacheCounter)
prometheus.MustRegister(LoadTableCacheDurationHistogram)
prometheus.MustRegister(NonTransactionalDeleteCount)
prometheus.MustRegister(NonTransactionalDMLCount)
prometheus.MustRegister(MemoryUsage)
prometheus.MustRegister(StatsCacheLRUCounter)
prometheus.MustRegister(StatsCacheLRUGauge)
Expand Down Expand Up @@ -229,7 +229,7 @@ func ToggleSimplifiedMode(simplified bool) {
ReadFromTableCacheCounter,
TiFlashQueryTotalCounter,
CampaignOwnerCounter,
NonTransactionalDeleteCount,
NonTransactionalDMLCount,
MemoryUsage,
TokenGauge,
tikvmetrics.TiKVRawkvSizeHistogram,
Expand Down
7 changes: 4 additions & 3 deletions metrics/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,14 @@ var (
Help: "Counter of validating read ts by getting a timestamp from PD",
})

NonTransactionalDeleteCount = prometheus.NewCounter(
NonTransactionalDMLCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "session",
Name: "non_transactional_delete_count",
Name: "non_transactional_dml_count",
Help: "Counter of non-transactional delete",
})
}, []string{LblType},
)
TxnStatusEnteringCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Expand Down
5 changes: 4 additions & 1 deletion metrics/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,19 +325,22 @@ func GetTablePartitionCounter() TablePartitionUsageCounter {
// NonTransactionalStmtCounter records the usages of non-transactional statements.
type NonTransactionalStmtCounter struct {
DeleteCount int64 `json:"delete"`
InsertCount int64 `json:"insert"`
}

// Sub returns the difference of two counters.
func (n NonTransactionalStmtCounter) Sub(rhs NonTransactionalStmtCounter) NonTransactionalStmtCounter {
return NonTransactionalStmtCounter{
DeleteCount: n.DeleteCount - rhs.DeleteCount,
InsertCount: n.InsertCount - rhs.InsertCount,
}
}

// GetNonTransactionalStmtCounter gets the NonTransactionalStmtCounter.
func GetNonTransactionalStmtCounter() NonTransactionalStmtCounter {
return NonTransactionalStmtCounter{
DeleteCount: readCounter(NonTransactionalDeleteCount),
DeleteCount: readCounter(NonTransactionalDMLCount.With(prometheus.Labels{LblType: "delete"})),
InsertCount: readCounter(NonTransactionalDMLCount.With(prometheus.Labels{LblType: "insert"})),
}
}

Expand Down
38 changes: 38 additions & 0 deletions parser/ast/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,43 @@ func (n *InsertStmt) Accept(v Visitor) (Node, bool) {
return v.Leave(n)
}

// WhereExpr implements ShardableDMLStmt interface.
func (n *InsertStmt) WhereExpr() ExprNode {
if n.Select == nil {
return nil
}
s, ok := n.Select.(*SelectStmt)
if !ok {
return nil
}
return s.Where
}

// SetWhereExpr implements ShardableDMLStmt interface.
func (n *InsertStmt) SetWhereExpr(e ExprNode) {
if n.Select == nil {
return
}
s, ok := n.Select.(*SelectStmt)
if !ok {
return
}
s.Where = e
}

// TableSource implements ShardableDMLStmt interface.
func (n *InsertStmt) TableSource() (*TableSource, bool) {
if n.Select == nil {
return nil, false
}
s, ok := n.Select.(*SelectStmt)
if !ok {
return nil, false
}
table, ok := s.From.TableRefs.Left.(*TableSource)
return table, ok
}

// DeleteStmt is a statement to delete rows from table.
// See https://dev.mysql.com/doc/refman/5.7/en/delete.html
type DeleteStmt struct {
Expand Down Expand Up @@ -2395,6 +2432,7 @@ type ShardableDMLStmt = interface {

var _ ShardableDMLStmt = &DeleteStmt{}
var _ ShardableDMLStmt = &UpdateStmt{}
var _ ShardableDMLStmt = &InsertStmt{}

type NonTransactionalDMLStmt struct {
dmlNode
Expand Down
7,028 changes: 3,515 additions & 3,513 deletions parser/parser.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -13850,6 +13850,7 @@ NonTransactionalDMLStmt:
ShardableStmt:
DeleteFromStmt
| UpdateStmt
| InsertIntoStmt

DryRunOptions:
{
Expand Down
27 changes: 27 additions & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6920,6 +6920,7 @@ func TestCharsetIntroducer(t *testing.T) {

func TestNonTransactionalDML(t *testing.T) {
cases := []testCase{
// deletes
{"batch on c limit 10 delete from t where c = 10", true,
"BATCH ON `c` LIMIT 10 DELETE FROM `t` WHERE `c`=10"},
{"batch on c limit 10 dry run delete from t where c = 10", true,
Expand All @@ -6945,6 +6946,32 @@ func TestNonTransactionalDML(t *testing.T) {
"BATCH LIMIT 10 DRY RUN UPDATE `t` SET `c`=10"},
{"batch limit 10 dry run query update t set c = 10", true,
"BATCH LIMIT 10 DRY RUN QUERY UPDATE `t` SET `c`=10"},
// inserts
{"batch on c limit 10 insert into t1 select * from t2 where c = 10", true,
"BATCH ON `c` LIMIT 10 INSERT INTO `t1` SELECT * FROM `t2` WHERE `c`=10"},
{"batch on c limit 10 dry run insert into t1 select * from t2 where c = 10", true,
"BATCH ON `c` LIMIT 10 DRY RUN INSERT INTO `t1` SELECT * FROM `t2` WHERE `c`=10"},
{"batch on c limit 10 dry run query insert into t1 select * from t2 where c = 10", true,
"BATCH ON `c` LIMIT 10 DRY RUN QUERY INSERT INTO `t1` SELECT * FROM `t2` WHERE `c`=10"},
{"batch limit 10 insert into t1 select * from t2 where c = 10", true,
"BATCH LIMIT 10 INSERT INTO `t1` SELECT * FROM `t2` WHERE `c`=10"},
{"batch limit 10 dry run insert into t1 select * from t2 where c = 10", true,
"BATCH LIMIT 10 DRY RUN INSERT INTO `t1` SELECT * FROM `t2` WHERE `c`=10"},
{"batch limit 10 dry run query insert into t1 select * from t2 where c = 10", true,
"BATCH LIMIT 10 DRY RUN QUERY INSERT INTO `t1` SELECT * FROM `t2` WHERE `c`=10"},
// inserts on duplicate key update
{"batch on c limit 10 insert into t1 select * from t2 where c = 10 on duplicate key update t1.val = t2.val", true,
"BATCH ON `c` LIMIT 10 INSERT INTO `t1` SELECT * FROM `t2` WHERE `c`=10 ON DUPLICATE KEY UPDATE `t1`.`val`=`t2`.`val`"},
{"batch on c limit 10 dry run insert into t1 select * from t2 where c = 10 on duplicate key update t1.val = t2.val", true,
"BATCH ON `c` LIMIT 10 DRY RUN INSERT INTO `t1` SELECT * FROM `t2` WHERE `c`=10 ON DUPLICATE KEY UPDATE `t1`.`val`=`t2`.`val`"},
{"batch on c limit 10 dry run query insert into t1 select * from t2 where c = 10 on duplicate key update t1.val = t2.val", true,
"BATCH ON `c` LIMIT 10 DRY RUN QUERY INSERT INTO `t1` SELECT * FROM `t2` WHERE `c`=10 ON DUPLICATE KEY UPDATE `t1`.`val`=`t2`.`val`"},
{"batch limit 10 insert into t1 select * from t2 where c = 10 on duplicate key update t1.val = t2.val", true,
"BATCH LIMIT 10 INSERT INTO `t1` SELECT * FROM `t2` WHERE `c`=10 ON DUPLICATE KEY UPDATE `t1`.`val`=`t2`.`val`"},
{"batch limit 10 dry run insert into t1 select * from t2 where c = 10 on duplicate key update t1.val = t2.val", true,
"BATCH LIMIT 10 DRY RUN INSERT INTO `t1` SELECT * FROM `t2` WHERE `c`=10 ON DUPLICATE KEY UPDATE `t1`.`val`=`t2`.`val`"},
{"batch limit 10 dry run query insert into t1 select * from t2 where c = 10 on duplicate key update t1.val = t2.val", true,
"BATCH LIMIT 10 DRY RUN QUERY INSERT INTO `t1` SELECT * FROM `t2` WHERE `c`=10 ON DUPLICATE KEY UPDATE `t1`.`val`=`t2`.`val`"},
}

RunTest(t, cases, false)
Expand Down
1 change: 1 addition & 0 deletions session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ go_library(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_tipb//go-binlog",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
86 changes: 72 additions & 14 deletions session/nontransactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ import (
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

// ErrNonTransactionalJobFailure is the error when a non-transactional job fails. The error is returned and following jobs are canceled.
var ErrNonTransactionalJobFailure = dbterror.ClassSession.NewStd(errno.ErrNonTransactionalJobFailure)

var (
nonTransactionalDeleteCount = metrics.NonTransactionalDMLCount.With(prometheus.Labels{metrics.LblType: "delete"})
nonTransactionalInsertCount = metrics.NonTransactionalDMLCount.With(prometheus.Labels{metrics.LblType: "insert"})
)

// job: handle keys in [start, end]
type job struct {
start types.Datum
Expand Down Expand Up @@ -75,6 +81,13 @@ func (j job) String(redacted bool) string {

// HandleNonTransactionalDML is the entry point for a non-transactional DML statement
func HandleNonTransactionalDML(ctx context.Context, stmt *ast.NonTransactionalDMLStmt, se Session) (sqlexec.RecordSet, error) {
sessVars := se.GetSessionVars()
originalReadStaleness := se.GetSessionVars().ReadStaleness
// NT-DML is a write operation, and should not be affected by read_staleness that is supposed to affect only SELECT.
sessVars.ReadStaleness = 0
defer func() {
sessVars.ReadStaleness = originalReadStaleness
}()
err := core.Preprocess(ctx, se, stmt)
if err != nil {
return nil, err
Expand Down Expand Up @@ -130,32 +143,80 @@ func checkConstraint(stmt *ast.NonTransactionalDMLStmt, se Session) error {

switch s := stmt.DMLStmt.(type) {
case *ast.DeleteStmt:
if s.TableRefs == nil || s.TableRefs.TableRefs == nil || s.TableRefs.TableRefs.Left == nil {
return errors.New("table reference is nil")
if err := checkTableRef(s.TableRefs); err != nil {
return err
}
if s.TableRefs.TableRefs.Right != nil {
return errors.New("Non-transactional delete doesn't support multiple tables")
if err := checkReadClauses(s.Limit, s.Order); err != nil {
return err
}
if s.Limit != nil {
return errors.New("Non-transactional delete doesn't support limit")
}
if s.Order != nil {
return errors.New("Non-transactional delete doesn't support order by")
}
metrics.NonTransactionalDeleteCount.Inc()
nonTransactionalDeleteCount.Inc()
case *ast.UpdateStmt:
// TODO: check: (1) single target table (2) more...
if s.Limit != nil {
return errors.New("Non-transactional update doesn't support limit")
}
// TODO: metrics
case *ast.InsertStmt:
if s.Select == nil {
return errors.New("Non-transactional insert supports insert select stmt only")
}
selectStmt, ok := s.Select.(*ast.SelectStmt)
if !ok {
return errors.New("Non-transactional insert doesn't support non-select source")
}
if err := checkTableRef(selectStmt.From); err != nil {
return err
}
if err := checkReadClauses(selectStmt.Limit, selectStmt.OrderBy); err != nil {
return err
}
sourceTable, ok := selectStmt.From.TableRefs.Left.(*ast.TableSource)
if !ok {
return errors.New("Non-transactional insert must have a source table")
}
sourceName, ok := sourceTable.Source.(*ast.TableName)
if !ok {
return errors.New("Non-transaction insert must have s source table")
}
targetTable, ok := s.Table.TableRefs.Left.(*ast.TableSource)
if !ok {
return errors.New("Non-transactional insert must have a target table")
}
targetName, ok := targetTable.Source.(*ast.TableName)
if !ok {
return errors.New("Non-transactional insert must have a target table")
}
if sourceName.Name.L == targetName.Name.L {
return errors.New("Non-transactional insert doesn't support self-insert")
}
nonTransactionalInsertCount.Inc()
default:
return errors.New("Unsupported DML type for non-transactional DML")
}

return nil
}

func checkTableRef(t *ast.TableRefsClause) error {
if t == nil || t.TableRefs == nil || t.TableRefs.Left == nil {
return errors.New("table reference is nil")
}
if t.TableRefs.Right != nil {
return errors.New("Non-transactional statements don't support multiple tables")
}
return nil
}

func checkReadClauses(limit *ast.Limit, order *ast.OrderByClause) error {
if limit != nil {
return errors.New("Non-transactional statements don't support limit")
}
if order != nil {
return errors.New("Non-transactional statements don't support order by")
}
return nil
}

// single-threaded worker. work on the key range [start, end]
func runJobs(ctx context.Context, jobs []job, stmt *ast.NonTransactionalDMLStmt,
tableName *ast.TableName, se Session, originalCondition ast.ExprNode) ([]string, error) {
Expand Down Expand Up @@ -358,11 +419,8 @@ func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDMLStmt, se S
originalSelectLimit := se.GetSessionVars().SelectLimit
se.GetSessionVars().SelectLimit = math.MaxUint64
// NT-DML is a write operation, and should not be affected by read_staleness that is supposed to affect only SELECT.
originalReadStaleness := se.GetSessionVars().ReadStaleness
se.GetSessionVars().ReadStaleness = 0
rss, err := se.Execute(ctx, selectSQL)
se.GetSessionVars().SelectLimit = originalSelectLimit
se.GetSessionVars().ReadStaleness = originalReadStaleness

if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit d7bd49a

Please sign in to comment.