forked from GoDannyLai/binlog_rollback
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
69 lines (55 loc) · 2.02 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package main
import (
"sync"
"github.com/siddontang/go-mysql/replication"
)
func main() {
gLogger.CreateNewRawLogger()
gConfCmd.IfSetStopParsPoint = false
gConfCmd.ParseCmdOptions()
GetTblDefFromDbAndMergeAndDump(gConfCmd)
if gConfCmd.WorkType != "stats" {
G_HandlingBinEventIndex = &BinEventHandlingIndx{EventIdx: 1, finished: false}
}
eventChan := make(chan MyBinEvent, gConfCmd.Threads*2)
statChan := make(chan BinEventStats, gConfCmd.Threads*2)
orgSqlChan := make(chan OrgSqlPrint, gConfCmd.Threads*2)
sqlChan := make(chan ForwardRollbackSqlOfPrint, gConfCmd.Threads*2)
var wg, wgGenSql sync.WaitGroup
// stats file
statFH, ddlFH, biglongFH := OpenStatsResultFiles(gConfCmd)
defer statFH.Close()
defer ddlFH.Close()
defer biglongFH.Close()
wg.Add(1)
go ProcessBinEventStats(statFH, ddlFH, biglongFH, gConfCmd, statChan, &wg)
if gConfCmd.IfWriteOrgSql {
wg.Add(1)
go PrintOrgSqlToFile(gConfCmd.OutputDir, orgSqlChan, &wg)
}
if gConfCmd.WorkType != "stats" {
// write forward or rollback sql to file
wg.Add(1)
go PrintExtraInfoForForwardRollbackupSql(gConfCmd, sqlChan, &wg)
// generate forward or rollback sql from binlog
//gThreadsFinished.threadsCnt = gConfCmd.Threads
for i := uint(1); i <= gConfCmd.Threads; i++ {
wgGenSql.Add(1)
go GenForwardRollbackSqlFromBinEvent(i, gConfCmd, eventChan, sqlChan, &wgGenSql)
}
}
if gConfCmd.Mode == "repl" {
ParserAllBinEventsFromRepl(gConfCmd, eventChan, statChan, orgSqlChan)
} else if gConfCmd.Mode == "file" {
myParser := BinFileParser{}
myParser.parser = replication.NewBinlogParser()
myParser.parser.SetTimestampStringLocation(gBinlogTimeLocation)
myParser.parser.SetParseTime(false) // donot parse mysql datetime/time column into go time structure, take it as string
myParser.parser.SetUseDecimal(false) // sqlbuilder not support decimal type
myParser.MyParseAllBinlogFiles(gConfCmd, eventChan, statChan, orgSqlChan)
}
//fmt.Println(gThreadsFinished.threadsCnt, gThreadsFinished.threadsCnt)
wgGenSql.Wait()
close(sqlChan)
wg.Wait()
}