forked from pingcap/dm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
status.go
91 lines (80 loc) · 2.67 KB
/
status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package syncer
import (
"github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)
// Status implements SubTaskUnit.Status
// it returns status, but does not calc status
func (s *Syncer) Status() interface{} {
var (
masterPos mysql.Position
masterGTIDSet gtid.Set
)
total := s.count.Get()
totalTps := s.totalTps.Get()
tps := s.tps.Get()
masterPos, masterGTIDSet, err := s.getMasterStatus()
if err != nil {
s.tctx.L().Warn("fail to get master status", zap.Error(err))
}
syncerLocation := s.checkpoint.FlushedGlobalPoint()
if err != nil {
s.tctx.L().Warn("fail to get flushed global point", zap.Error(err))
}
st := &pb.SyncStatus{
TotalEvents: total,
TotalTps: totalTps,
RecentTps: tps,
MasterBinlog: masterPos.String(),
SyncerBinlog: syncerLocation.Position.String(),
}
if masterGTIDSet != nil { // masterGTIDSet maybe a nil interface
st.MasterBinlogGtid = masterGTIDSet.String()
}
if syncerLocation.GTIDSet != nil {
st.SyncerBinlogGtid = syncerLocation.GTIDSet.String()
}
st.BinlogType = "unknown"
if s.streamerController != nil {
st.BinlogType = binlogTypeToString(s.streamerController.GetBinlogType())
}
// If a syncer unit is waiting for relay log catch up, it has not executed
// LoadMeta and will return a parsed binlog name error. As we can find mysql
// position in syncer status, we record this error only in debug level.
realPos, err := binlog.RealMySQLPos(syncerLocation.Position)
if err != nil {
s.tctx.L().Debug("fail to parse real mysql position", zap.Stringer("position", syncerLocation.Position), log.ShortError(err))
}
if s.cfg.EnableGTID {
if masterGTIDSet != nil && syncerLocation.GTIDSet != nil && masterGTIDSet.Equal(syncerLocation.GTIDSet) {
st.Synced = true
}
} else {
st.Synced = utils.CompareBinlogPos(masterPos, realPos, 0) == 0
}
if s.cfg.IsSharding {
st.UnresolvedGroups = s.sgk.UnresolvedGroups()
}
pendingShardInfo := s.pessimist.PendingInfo()
if pendingShardInfo != nil {
st.BlockingDDLs = pendingShardInfo.DDLs
}
return st
}