Skip to content

Commit

Permalink
feat: retry write points and print the retried points
Browse files Browse the repository at this point in the history
  • Loading branch information
shilinlee committed Oct 19, 2023
1 parent e0de62b commit 5ffd42a
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 36 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash
go mod tidy
go build -o dataMigrate ./src/
go build .
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module dataMigrate
module github.com/openGemini/dataMigrate

go 1.16

Expand Down
13 changes: 6 additions & 7 deletions src/main.go → main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,22 @@ package main

import (
"os"
)

var logger *Logger
"github.com/openGemini/dataMigrate/src"
)

func main() {
logger = NewLogger()
defer logger.close()
logger.LogString("Data migrate tool starting", TOCONSOLE, LEVEL_INFO)
defer src.Logger.Close()
src.Logger.LogString("Data migrate tool starting", src.TOCONSOLE, src.LEVEL_INFO)
if err := Run(os.Args[1:]...); err != nil {
logger.LogError(err)
src.Logger.LogError(err)
os.Exit(1)
}
}

func Run(args ...string) error {
if len(args) > 0 {
cmd := NewDataMigrateCommand()
cmd := src.NewDataMigrateCommand()
if err := cmd.Run(args...); err != nil {
return err
}
Expand Down
30 changes: 18 additions & 12 deletions src/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ the implementations of the "location" and "KeyCursor" classes in the above file
copyright 2023 Qizhi Huang([email protected])
*/

package main
package src

import (
"container/heap"
Expand Down Expand Up @@ -348,35 +348,41 @@ func (s *Scanner) writeBatches(c client.Client, cmd Migrator) error {
}

pt, err := s.nextPoint(cmd)

if err != nil {
logger.LogString("point read error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
return err
}

if pt == nil {
rowsNum := len(bp.Points())
err := c.Write(bp)
if err != nil {
logger.LogString("insert error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
return err
}
s.retryWrite(c, bp)
cmd.getStat().rowsRead += rowsNum
break
}

bp.AddPoint(pt)
count = count + 1
if count == cmd.getBatchSize() {
err := c.Write(bp)
if err != nil {
logger.LogString("insert error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
return err
}
s.retryWrite(c, bp)
cmd.getStat().rowsRead += cmd.getBatchSize()
flag = true
count = 0
}
}
return nil
}

func (s *Scanner) retryWrite(c client.Client, bp client.BatchPoints) {
for {
err := c.Write(bp)
if err == nil {
break
}
logger.LogString("insert error: "+err.Error(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
points := bp.Points()
if len(points) > 0 {
logger.LogString("retry for points like:"+points[0].String(), TOLOGFILE|TOCONSOLE, LEVEL_ERROR)
}
time.Sleep(3 * time.Second)
}
}
2 changes: 1 addition & 1 deletion src/dataMigrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Add insert values into openGemini
copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.
*/

package main
package src

import (
"bytes"
Expand Down
2 changes: 1 addition & 1 deletion src/dataMigrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Remove write data to local file
Add insert values into openGemini
copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.
*/
package main
package src

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion src/geminiservice.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package src

import (
client "github.com/influxdata/influxdb1-client/v2"
Expand Down
26 changes: 17 additions & 9 deletions src/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
copyright 2023 Qizhi Huang([email protected])
*/

package main
package src

import (
"log"
Expand Down Expand Up @@ -30,7 +30,7 @@ var LevelPrefixDict map[int]string = map[int]string{
LEVEL_ERROR: "ERROR: ",
}

type Logger struct {
type Log struct {
logDir string
logName string
fileWriter *os.File
Expand All @@ -40,12 +40,20 @@ type Logger struct {
debug bool
}

func NewLogger() *Logger {
var Logger *Log
var logger *Log

func init() {
Logger = NewLogger()
logger = Logger
}

func NewLogger() *Log {
var err error
tm := time.Unix(0, time.Now().UnixNano())
timestr := tm.Format("2006-01-02_15-04-05")
filename := "migrate_log_" + timestr + ".log"
l := &Logger{
l := &Log{
logDir: "./logs",
logName: filename,
debug: false,
Expand All @@ -69,15 +77,15 @@ func NewLogger() *Logger {
return l
}

func (l *Logger) SetDebug() {
func (l *Log) SetDebug() {
l.debug = true
}

func (l *Logger) IsDebug() bool {
func (l *Log) IsDebug() bool {
return l.debug
}

func (l *Logger) LogString(str string, target int, level int) {
func (l *Log) LogString(str string, target int, level int) {
if level == LEVEL_DEBUG {
if !l.debug {
return
Expand All @@ -96,12 +104,12 @@ func (l *Logger) LogString(str string, target int, level int) {
}
}

func (l *Logger) LogError(err error) {
func (l *Log) LogError(err error) {
l.fileLogger.Println("ERROR: ", err.Error())
l.errorLogger.Println("ERROR: ", err)
}

func (l *Logger) close() {
func (l *Log) Close() {
l.fileLogger = nil
l.fileWriter.Close()
l.consoleLogger = nil
Expand Down
7 changes: 4 additions & 3 deletions src/migrator.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package src

import (
"fmt"
Expand Down Expand Up @@ -201,7 +201,8 @@ func (m *migrator) writeCurrentFiles() error {
if measurement, ok = m.mstCache.Get(series); !ok {
measurement, tags, err = splitMeasurementAndTag(series)
if err != nil {
return err
logger.LogString(fmt.Sprintf("split measurement name and tag from %s, err: %s", series, err), TOLOGFILE, LEVEL_ERROR)
continue
}
m.mstCache.Add(series, measurement)
m.tagsCache.Add(series, tags)
Expand All @@ -226,7 +227,7 @@ func (m *migrator) writeCurrentFiles() error {
key: key,
seeks: m.locations(key, m.startTime, m.endTime),
}
if err := newCursor.init(); err != nil {
if err = newCursor.init(); err != nil {
return err
}
scanner.fields[f] = newCursor
Expand Down

0 comments on commit 5ffd42a

Please sign in to comment.