diff --git a/build.sh b/build.sh index 1c9a126..e7e8ec2 100755 --- a/build.sh +++ b/build.sh @@ -1,3 +1,3 @@ #!/bin/bash go mod tidy -go build -o dataMigrate ./src/ +go build . diff --git a/go.mod b/go.mod index ac2fc88..9cf1c5c 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module dataMigrate +module github.com/openGemini/dataMigrate go 1.16 diff --git a/src/main.go b/main.go similarity index 79% rename from src/main.go rename to main.go index fec47e3..f116c8e 100644 --- a/src/main.go +++ b/main.go @@ -16,23 +16,22 @@ package main import ( "os" -) -var logger *Logger + "github.com/openGemini/dataMigrate/src" +) func main() { - logger = NewLogger() - defer logger.close() - logger.LogString("Data migrate tool starting", TOCONSOLE, LEVEL_INFO) + defer src.Logger.Close() + src.Logger.LogString("Data migrate tool starting", src.TOCONSOLE, src.LEVEL_INFO) if err := Run(os.Args[1:]...); err != nil { - logger.LogError(err) + src.Logger.LogError(err) os.Exit(1) } } func Run(args ...string) error { if len(args) > 0 { - cmd := NewDataMigrateCommand() + cmd := src.NewDataMigrateCommand() if err := cmd.Run(args...); err != nil { return err } diff --git a/src/cursor.go b/src/cursor.go index 315eb24..81efe75 100644 --- a/src/cursor.go +++ b/src/cursor.go @@ -12,7 +12,7 @@ the implementations of the "location" and "KeyCursor" classes in the above file copyright 2023 Qizhi Huang(flaggyellow@qq.com) */ -package main +package src import ( "container/heap" @@ -348,7 +348,6 @@ func (s *Scanner) writeBatches(c client.Client, cmd Migrator) error { } pt, err := s.nextPoint(cmd) - if err != nil { logger.LogString("point read error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR) return err @@ -356,11 +355,7 @@ func (s *Scanner) writeBatches(c client.Client, cmd Migrator) error { if pt == nil { rowsNum := len(bp.Points()) - err := c.Write(bp) - if err != nil { - logger.LogString("insert error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR) - return err - } + s.retryWrite(c, bp) cmd.getStat().rowsRead += rowsNum break } @@ -368,11 +363,7 @@ func (s *Scanner) writeBatches(c client.Client, cmd Migrator) error { bp.AddPoint(pt) count = count + 1 if count == cmd.getBatchSize() { - err := c.Write(bp) - if err != nil { - logger.LogString("insert error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR) - return err - } + s.retryWrite(c, bp) cmd.getStat().rowsRead += cmd.getBatchSize() flag = true count = 0 @@ -380,3 +371,18 @@ func (s *Scanner) writeBatches(c client.Client, cmd Migrator) error { } return nil } + +func (s *Scanner) retryWrite(c client.Client, bp client.BatchPoints) { + for { + err := c.Write(bp) + if err == nil { + break + } + logger.LogString("insert error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR) + points := bp.Points() + if len(points) > 0 { + logger.LogString("retry for points like:"+points[0].String(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR) + } + time.Sleep(3 * time.Second) + } +} diff --git a/src/dataMigrate.go b/src/dataMigrate.go index a7b03b3..ba530e0 100644 --- a/src/dataMigrate.go +++ b/src/dataMigrate.go @@ -11,7 +11,7 @@ Add insert values into openGemini copyright 2023 Huawei Cloud Computing Technologies Co., Ltd. */ -package main +package src import ( "bytes" diff --git a/src/dataMigrate_test.go b/src/dataMigrate_test.go index 23a50f9..cf36f7e 100644 --- a/src/dataMigrate_test.go +++ b/src/dataMigrate_test.go @@ -7,7 +7,7 @@ Remove write data to local file Add insert values into openGemini copyright 2023 Huawei Cloud Computing Technologies Co., Ltd. */ -package main +package src import ( "fmt" diff --git a/src/geminiservice.go b/src/geminiservice.go index 73eb910..9202c97 100644 --- a/src/geminiservice.go +++ b/src/geminiservice.go @@ -1,4 +1,4 @@ -package main +package src import ( client "github.com/influxdata/influxdb1-client/v2" diff --git a/src/log.go b/src/log.go index e8037d6..5ebed1d 100644 --- a/src/log.go +++ b/src/log.go @@ -2,7 +2,7 @@ copyright 2023 Qizhi Huang(flaggyellow@qq.com) */ -package main +package src import ( "log" @@ -30,7 +30,7 @@ var LevelPrefixDict map[int]string = map[int]string{ LEVEL_ERROR: "ERROR: ", } -type Logger struct { +type Log struct { logDir string logName string fileWriter *os.File @@ -40,12 +40,20 @@ type Logger struct { debug bool } -func NewLogger() *Logger { +var Logger *Log +var logger *Log + +func init() { + Logger = NewLogger() + logger = Logger +} + +func NewLogger() *Log { var err error tm := time.Unix(0, time.Now().UnixNano()) timestr := tm.Format("2006-01-02_15-04-05") filename := "migrate_log_" + timestr + ".log" - l := &Logger{ + l := &Log{ logDir: "./logs", logName: filename, debug: false, @@ -69,15 +77,15 @@ func NewLogger() *Logger { return l } -func (l *Logger) SetDebug() { +func (l *Log) SetDebug() { l.debug = true } -func (l *Logger) IsDebug() bool { +func (l *Log) IsDebug() bool { return l.debug } -func (l *Logger) LogString(str string, target int, level int) { +func (l *Log) LogString(str string, target int, level int) { if level == LEVEL_DEBUG { if !l.debug { return @@ -96,12 +104,12 @@ func (l *Logger) LogString(str string, target int, level int) { } } -func (l *Logger) LogError(err error) { +func (l *Log) LogError(err error) { l.fileLogger.Println("ERROR: ", err.Error()) l.errorLogger.Println("ERROR: ", err) } -func (l *Logger) close() { +func (l *Log) Close() { l.fileLogger = nil l.fileWriter.Close() l.consoleLogger = nil diff --git a/src/migrator.go b/src/migrator.go index 1e15741..af2ab18 100644 --- a/src/migrator.go +++ b/src/migrator.go @@ -1,4 +1,4 @@ -package main +package src import ( "fmt" @@ -201,7 +201,8 @@ func (m *migrator) writeCurrentFiles() error { if measurement, ok = m.mstCache.Get(series); !ok { measurement, tags, err = splitMeasurementAndTag(series) if err != nil { - return err + logger.LogString(fmt.Sprintf("split measurement name and tag from %s, err: %s", series, err), TOLOGFILE, LEVEL_ERROR) + continue } m.mstCache.Add(series, measurement) m.tagsCache.Add(series, tags) @@ -226,7 +227,7 @@ func (m *migrator) writeCurrentFiles() error { key: key, seeks: m.locations(key, m.startTime, m.endTime), } - if err := newCursor.init(); err != nil { + if err = newCursor.init(); err != nil { return err } scanner.fields[f] = newCursor