Skip to content

Commit

Permalink
filter (ticdc): add a lock to protect the parser from being called co…
Browse files Browse the repository at this point in the history
…ncurrently. (#9577)

close #9571
  • Loading branch information
asddongmen authored Aug 21, 2023
1 parent f3984a5 commit 8d3997a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 4 deletions.
18 changes: 14 additions & 4 deletions pkg/filter/sql_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package filter

import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
Expand Down Expand Up @@ -95,13 +97,19 @@ func verifyIgnoreEvents(types []bf.EventType) error {

// sqlEventFilter is a filter that filters DDL/DML event by its type or query.
type sqlEventFilter struct {
p *parser.Parser
rules []*sqlEventRule
// Please be careful, parser.Parser is not thread safe.
pLock sync.Mutex
// Currently, parser is only used to parse ddl query.
// So we can use a lock to protect it.
// If we want to use it to parse dml query in the future,
// we should create a parser for each goroutine.
ddlParser *parser.Parser
rules []*sqlEventRule
}

func newSQLEventFilter(cfg *config.FilterConfig) (*sqlEventFilter, error) {
res := &sqlEventFilter{
p: parser.New(),
ddlParser: parser.New(),
}
for _, rule := range cfg.EventFilters {
if err := res.addRule(rule); err != nil {
Expand Down Expand Up @@ -143,7 +151,9 @@ func (f *sqlEventFilter) shouldSkipDDL(
log.Info("sql event filter handle ddl event",
zap.Any("ddlType", ddlType), zap.String("schema", schema),
zap.String("table", table), zap.String("query", query))
evenType, err := ddlToEventType(f.p, query, ddlType)
f.pLock.Lock()
evenType, err := ddlToEventType(f.ddlParser, query, ddlType)
f.pLock.Unlock()
if err != nil {
return false, err
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/filter/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package filter
import (
"testing"

"github.com/pingcap/log"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb/parser"
timodel "github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -116,3 +117,60 @@ func TestDDLToEventType(t *testing.T) {
require.Equal(t, c.eventType, et, "case%v", c.ddl)
}
}

func TestDDLToTypeSpecialDDL(t *testing.T) {
type c struct {
ddl string
jobType timodel.ActionType
evenType bf.EventType
err error
}

ddlWithTab := `CREATE TABLE if not exists sbtest25
(
id bigint NOT NULL,
k bigint NOT NULL DEFAULT '0',
c char(30) NOT NULL DEFAULT '',
pad char(20) NOT NULL DEFAULT '',
PRIMARY KEY (id),
KEY k_1 (k)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin`
ddlWithTwoTab := ` CREATE TABLE if not exists sbtest25
(
id bigint NOT NULL,
k bigint NOT NULL DEFAULT '0',
c char(30) NOT NULL DEFAULT '',
pad char(20) NOT NULL DEFAULT '',
PRIMARY KEY (id),
KEY k_1 (k)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin`
ddlWithNewLine := `CREATE TABLE finish_mark
(
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL)`

cases := []c{
{"CREATE DATABASE test", timodel.ActionCreateSchema, bf.CreateDatabase, nil},
{ddlWithTwoTab, timodel.ActionCreateTable, bf.CreateTable, nil},
{ddlWithTab, timodel.ActionCreateTable, bf.CreateTable, nil},
{ddlWithNewLine, timodel.ActionCreateTable, bf.CreateTable, nil},
}
p := parser.New()
for _, c := range cases {
log.Info(c.ddl)
et, err := ddlToEventType(p, c.ddl, c.jobType)
if c.err != nil {
errRFC, ok := cerror.RFCCode(err)
require.True(t, ok)
caseErrRFC, ok := cerror.RFCCode(c.err)
require.True(t, ok)
require.Equal(t, caseErrRFC, errRFC)
} else {
require.NoError(t, err)
}
require.Equal(t, c.evenType, et, "case%v", c.ddl)
}
}

0 comments on commit 8d3997a

Please sign in to comment.