forked from pingcap/dm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
inject_sql.go
108 lines (98 loc) · 3.12 KB
/
inject_sql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package syncer
import (
"context"
"time"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/siddontang/go-mysql/replication"
"go.uber.org/zap"
"github.com/pingcap/dm/pkg/binlog"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/terror"
)
// InjectSQLs injects ddl into syncer as binlog events while meet xid/query event
// TODO: let user to specify special xid/query event position
// TODO: inject dml sqls
func (s *Syncer) InjectSQLs(ctx context.Context, sqls []string) error {
// verify and fetch schema name
schemas := make([]string, 0, len(sqls))
parser2 := parser.New()
for _, sql := range sqls {
node, err := parser2.ParseOneStmt(sql, "", "")
if err != nil {
return terror.Annotatef(terror.ErrSyncerUnitParseStmt.New(err.Error()), "sql %s", sql)
}
ddlNode, ok := node.(ast.DDLNode)
if !ok {
return terror.ErrSyncerUnitInjectDDLOnly.Generate(sql)
}
tableNames, err := parserpkg.FetchDDLTableNames("", ddlNode)
if err != nil {
return err
}
if len(tableNames[0].Schema) == 0 {
return terror.ErrSyncerUnitInjectDDLWithoutSchema.Generate(sql)
}
schemas = append(schemas, tableNames[0].Schema)
}
for i, sql := range sqls {
schema := schemas[i]
ev := genIncompleteQueryEvent([]byte(schema), []byte(sql))
newCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
s.tctx.L().Info("injecting sql", zap.String("sql", sql), zap.String("schema", schema))
select {
case s.injectEventCh <- ev:
case <-newCtx.Done():
cancel()
return newCtx.Err()
}
cancel()
}
return nil
}
func (s *Syncer) tryInject(op opType, location binlog.Location) *replication.BinlogEvent {
if op != xid && op != ddl {
return nil
}
select {
case e := <-s.injectEventCh:
// try receive from extra binlog event chan
// NOTE: now we simply set EventSize to 0, make event's start / end pos are the same
// TODO: support GTID
e.Header.LogPos = location.Position.Pos
e.Header.EventSize = 0
s.tctx.L().Info("inject binlog event from inject chan", zap.Reflect("header", e.Header), zap.Reflect("event", e.Event))
return e
default:
return nil
}
}
// generates an incomplete QueryEvent, only partial fields are valid
// now, it only used to generate QueryEvent to force sharding group to be synced
// NOTE: using only if you know want your are doing
func genIncompleteQueryEvent(schema, query []byte) *replication.BinlogEvent {
header := &replication.EventHeader{
EventType: replication.QUERY_EVENT,
}
queryEvent := &replication.QueryEvent{
Schema: schema,
Query: query,
}
e := &replication.BinlogEvent{
Header: header,
Event: queryEvent,
}
return e
}