diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index ac491b268..6a379d576 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -125,6 +125,14 @@ Why is this behavior configurable? Different workloads have different characteri Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light enough for `gh-ost` to apply a fraction of the batch size. +### ignore-over-iteration-range-max-binlog +When binlog unique key value is over `MigrationIterationRangeMaxValues`, and less than `MigrationRangeMaxValues`, the binlog will be ignored. Because the data will be synced by copy chunk. +When binlog unique key value is over `MigrationRangeMaxValues` or less than `MigrationIterationRangeMaxValues`, the binlog will be applied. + +### is-merge-dml-event +When is-merge-dml-event is `true`, and the chunk unique key is int or float type. the sync binlog event while be merged into map, and the key is unique key value. +Then traverse the map, merge all delete operations into one `delete sql`, merge all insert and update operations into `replace sql`, and then execute. + ### exact-rowcount A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is? diff --git a/go/base/context.go b/go/base/context.go index 300ec1201..fd9aae97f 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -194,7 +194,10 @@ type MigrationContext struct { controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 TotalDMLEventsApplied int64 + TotalDMLEventsIgnored int64 DMLBatchSize int64 + IgnoreOverIterationRangeMaxBinlog bool + IsMergeDMLEvents bool isThrottled bool throttleReason string throttleReasonHint ThrottleReasonHint diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 139703077..d0feba2b6 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -108,6 +108,8 @@ func main() { defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)") niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after") + flag.BoolVar(&migrationContext.IgnoreOverIterationRangeMaxBinlog, "ignore-over-iteration-range-max-binlog", false, "When binlog unique key value is over MigrationIterationRangeMaxValues, and less than MigrationRangeMaxValues, the binlog will be ignored. Because the data will be synced by copy chunk") + flag.BoolVar(&migrationContext.IsMergeDMLEvents, "is-merge-dml-event", false, "Merge DML Binlog Event") maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation") replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query") diff --git a/go/logic/applier.go b/go/logic/applier.go index fa374a70f..60a53366a 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -27,18 +27,22 @@ const ( ) type dmlBuildResult struct { - query string - args []interface{} - rowsDelta int64 - err error + dml binlog.EventDML + query string + args []interface{} + uniqueValues []interface{} + rowsDelta int64 + err error } -func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, err error) *dmlBuildResult { +func newDmlBuildResult(dml binlog.EventDML, query string, args []interface{}, uniqueValues []interface{}, rowsDelta int64, err error) *dmlBuildResult { return &dmlBuildResult{ - query: query, - args: args, - rowsDelta: rowsDelta, - err: err, + dml: dml, + query: query, + args: args, + uniqueValues: uniqueValues, + rowsDelta: rowsDelta, + err: err, } } @@ -1110,12 +1114,22 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (result case binlog.DeleteDML: { query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues()) - return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err)) + if ignore, err := this.isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs); err != nil { + return append(results, newDmlBuildResultError(fmt.Errorf("Check isIgnoreOverMaxChunkRangeEvent error: %+v", err))) + } else if ignore { + return results + } + return append(results, newDmlBuildResult(binlog.DeleteDML, query, uniqueKeyArgs, uniqueKeyArgs, -1, err)) } case binlog.InsertDML: { - query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues()) - return append(results, newDmlBuildResult(query, sharedArgs, 1, err)) + query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues()) + if ignore, err := this.isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs); err != nil { + return append(results, newDmlBuildResultError(fmt.Errorf("Check isIgnoreOverMaxChunkRangeEvent error: %+v", err))) + } else if ignore { + return results + } + return append(results, newDmlBuildResult(binlog.InsertDML, query, sharedArgs, uniqueKeyArgs, 1, err)) } case binlog.UpdateDML: { @@ -1127,20 +1141,114 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (result return results } query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) + if ignore, err := this.isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs); err != nil { + return append(results, newDmlBuildResultError(fmt.Errorf("Check isIgnoreOverMaxChunkRangeEvent error: %+v", err))) + } else if ignore { + return results + } args := sqlutils.Args() args = append(args, sharedArgs...) args = append(args, uniqueKeyArgs...) - return append(results, newDmlBuildResult(query, args, 0, err)) + return append(results, newDmlBuildResult(binlog.UpdateDML, query, args, uniqueKeyArgs, 0, err)) } } return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))) } +func (this *Applier) generateDeleteQuery(uniqueKeyValuesList [][]string) string { + var stmt string + if len(uniqueKeyValuesList) == 0 { + return stmt + } + + var whereClause string + for _, uniqueKeyValues := range uniqueKeyValuesList { + if uniqueKeyValues == nil || len(uniqueKeyValues) == 0 { + continue + } + _clause := "" + for _, val := range uniqueKeyValues { + if _clause == "" { + _clause = fmt.Sprintf("%v", val) + continue + } + _clause += fmt.Sprintf(", %v", val) + } + + if whereClause == "" { + whereClause = fmt.Sprintf("(%s)", _clause) + continue + } + whereClause = fmt.Sprintf("%s, (%s)", whereClause, _clause) + } + + stmt = fmt.Sprintf(` + DELETE /* gh-ost %s.%s */ + FROM %s.%s + WHERE (%s) IN (%s)`, + sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetGhostTableName()), + this.migrationContext.UniqueKey.Columns.EscapeString(), + whereClause, + ) + return stmt +} + +func (this *Applier) generateReplaceQuery(uniqueKeyValuesList [][]string) string { + var stmt string + if len(uniqueKeyValuesList) == 0 { + return stmt + } + + var whereClause string + for _, uniqueKeyValues := range uniqueKeyValuesList { + if uniqueKeyValues == nil || len(uniqueKeyValues) == 0 { + continue + } + _clause := "" + for _, val := range uniqueKeyValues { + if _clause == "" { + _clause = fmt.Sprintf("%v", val) + continue + } + _clause += fmt.Sprintf(", %v", val) + } + + if whereClause == "" { + whereClause = fmt.Sprintf(`(%s)`, _clause) + continue + } + whereClause = fmt.Sprintf(`%s, (%s)`, whereClause, _clause) + } + + stmt = fmt.Sprintf(` + REPLACE /* gh-ost %s.%s */ + INTO %s.%s (%s) + SELECT %s + FROM %s.%s + FORCE INDEX (%s) + WHERE (%s) IN (%s)`, + sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetGhostTableName()), this.migrationContext.SharedColumns.EscapeString(), + this.migrationContext.SharedColumns.EscapeString(), + sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.UniqueKey.Name), + this.migrationContext.UniqueKey.Columns.EscapeString(), + whereClause, + ) + + return stmt +} + // ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { var totalDelta int64 + var dmlEventSize int64 + var ignoredEventSize int64 - err := func() error { + var err error + + dbTxFunc := func(applyFunc func(*gosql.Tx) error) error { tx, err := this.db.Begin() if err != nil { return err @@ -1157,38 +1265,140 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) if _, err := tx.Exec(sessionQuery); err != nil { return rollback(err) } + + if err := applyFunc(tx); err != nil { + return rollback(err) + } + + if err := tx.Commit(); err != nil { + return err + } + return nil + } + + resultFunc := func(result gosql.Result, delta int64) { + if result == nil { + return + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err) + rowsAffected = 1 + } + totalDelta += delta * rowsAffected + } + + applyMapFunc := func(tx *gosql.Tx) error { + dmlMap := make(map[string]*dmlBuildResult) + const ValSep = "#gho#" for _, dmlEvent := range dmlEvents { - for _, buildResult := range this.buildDMLEventQuery(dmlEvent) { + buildResults := this.buildDMLEventQuery(dmlEvent) + if len(buildResults) == 0 { + ignoredEventSize++ + continue + } + dmlEventSize++ + for _, buildResult := range buildResults { if buildResult.err != nil { - return rollback(buildResult.err) + return buildResult.err } - result, err := tx.Exec(buildResult.query, buildResult.args...) + + values, err := this.migrationContext.UniqueKey.FormatValues(buildResult.uniqueValues) if err != nil { - err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args) - return rollback(err) + return err } + dmlMap[strings.Join(values, ValSep)] = buildResult + } + } + delArgs := make([][]string, 0) + uptArgs := make([][]string, 0) + insArgs := make([][]string, 0) + for key, buildResult := range dmlMap { + if buildResult == nil { + continue + } + values := strings.Split(key, ValSep) + switch buildResult.dml { + case binlog.DeleteDML: + delArgs = append(delArgs, values) + case binlog.InsertDML: + insArgs = append(insArgs, values) + case binlog.UpdateDML: + uptArgs = append(uptArgs, values) + default: + return fmt.Errorf("dost not support dml event %s", buildResult.dml) + } + } + + if len(delArgs) > 0 { + query := this.generateDeleteQuery(delArgs) + result, err := tx.Exec(query) + if err != nil { + err = fmt.Errorf("%w; query=%s", err, query) + return err + } + resultFunc(result, -1) + } + + if len(insArgs) > 0 { + query := this.generateReplaceQuery(insArgs) + result, err := tx.Exec(query) + if err != nil { + err = fmt.Errorf("%w; query=%s", err, query) + return err + } + resultFunc(result, 1) + } + + if len(uptArgs) > 0 { + query := this.generateReplaceQuery(uptArgs) + _, err := tx.Exec(query) + if err != nil { + err = fmt.Errorf("%w; query=%s", err, query) + return err + } + } + return nil + } - rowsAffected, err := result.RowsAffected() + applyAllFunc := func(tx *gosql.Tx) error { + for _, dmlEvent := range dmlEvents { + buildResults := this.buildDMLEventQuery(dmlEvent) + if len(buildResults) == 0 { + ignoredEventSize++ + continue + } + dmlEventSize++ + for _, buildResult := range buildResults { + if buildResult.err != nil { + return buildResult.err + } + result, err := tx.Exec(buildResult.query, buildResult.args...) if err != nil { - log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err) - rowsAffected = 1 + err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args) + return err } - // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1). - // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event - totalDelta += buildResult.rowsDelta * rowsAffected + + resultFunc(result, buildResult.rowsDelta) } } - if err := tx.Commit(); err != nil { - return err - } return nil - }() + } + + // IsMergeDMLEvents is enabled and unique key is memory comparable and unique key has only one column + if this.migrationContext.IsMergeDMLEvents && this.migrationContext.UniqueKey.IsMemoryComparable && this.migrationContext.UniqueKey.Len() == 1 { + err = dbTxFunc(applyMapFunc) + } else { + err = dbTxFunc(applyAllFunc) + } if err != nil { return this.migrationContext.Log.Errore(err) } // no error - atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlEvents))) + atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, dmlEventSize) + atomic.AddInt64(&this.migrationContext.TotalDMLEventsIgnored, ignoredEventSize) if this.migrationContext.CountTableRows { atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta) } @@ -1202,3 +1412,69 @@ func (this *Applier) Teardown() { this.singletonDB.Close() atomic.StoreInt64(&this.finishedMigrating, 1) } + +// isIgnoreOverMaxChunkRangeEvent returns true if this event should be ignored +// min rangeMax max +// the value > rangeMax and value < max, ignore = true +// otherwise ignore = false +func (this *Applier) isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{}) (bool, error) { + if !this.migrationContext.IgnoreOverIterationRangeMaxBinlog { + return false, nil + } + + // Compare whether it is greater than or equal to the maximum boundary value. If it is, it cannot be ignored and the corresponding binlog needs to be applied. + ignore, err := func() (bool, error) { + for order, uniqueKeyCol := range this.migrationContext.UniqueKey.Columns.Columns() { + if uniqueKeyCol.CompareValueFunc == nil { + return false, nil + } + + than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[order], this.migrationContext.MigrationRangeMaxValues.StringColumn(order)) + if err != nil { + return false, err + } + if than < 0 { + return true, nil + } else if than > 0 { + return false, nil + } + } + + // When it is equal to the boundary value, it cannot be ignored. + return false, nil + }() + if err != nil { + return false, err + } + + if !ignore { + return false, nil + } + + // Compare whether it exceeds the boundary value of IterationRangeMax. If it is greater, it can be ignored, if it is less, it cannot be ignored. + ignore, err = func() (bool, error) { + if this.migrationContext.MigrationIterationRangeMaxValues == nil { + return true, nil + } + + for order, uniqueKeyCol := range this.migrationContext.UniqueKey.Columns.Columns() { + than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[order], this.migrationContext.MigrationIterationRangeMaxValues.StringColumn(order)) + if err != nil { + return false, err + } + if than > 0 { + return true, nil + } else if than < 0 { + return false, nil + } + } + + //When it is equal to the boundary value, it cannot be ignored. + return false, nil + }() + if err != nil { + return false, err + } + + return ignore, nil +} diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 36c28d9e8..acb2d7813 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -6,6 +6,8 @@ package logic import ( + "fmt" + "math/big" "strings" "testing" @@ -183,3 +185,178 @@ func TestApplierInstantDDL(t *testing.T) { test.S(t).ExpectEquals(stmt, "ALTER /* gh-ost */ TABLE `test`.`mytable` ADD INDEX (foo), ALGORITHM=INSTANT") }) } + +func TestGenerateQuery(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.DatabaseName = "test" + migrationContext.OriginalTableName = "mytable" + uniqueColumns := sql.NewColumnList([]string{"id", "order_id"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY KEY", + Columns: *uniqueColumns, + } + sharedColumns := sql.NewColumnList([]string{"id", "order_id", "name", "age"}) + migrationContext.SharedColumns = sharedColumns + + applier := NewApplier(migrationContext) + + t.Run("generateDeleteQuery1", func(t *testing.T) { + stmt := applier.generateDeleteQuery([][]string{{"1", "2"}}) + test.S(t).ExpectEquals(stmt, ` + DELETE /* gh-ost `+"`test`.`mytable`"+` */ + FROM `+"`test`.`_mytable_gho`"+` + WHERE `+"(`id`,`order_id`)"+` IN ((1, 2))`) + }) + t.Run("generateDeleteQuery2", func(t *testing.T) { + stmt := applier.generateDeleteQuery([][]string{{"'1'", "'2'"}}) + test.S(t).ExpectEquals(stmt, ` + DELETE /* gh-ost `+"`test`.`mytable`"+` */ + FROM `+"`test`.`_mytable_gho`"+` + WHERE `+"(`id`,`order_id`)"+` IN (('1', '2'))`) + }) + t.Run("generateDeleteQuery3", func(t *testing.T) { + stmt := applier.generateDeleteQuery([][]string{{"'1'", "'2'"}, {"1", "23"}}) + test.S(t).ExpectEquals(stmt, ` + DELETE /* gh-ost `+"`test`.`mytable`"+` */ + FROM `+"`test`.`_mytable_gho`"+` + WHERE `+"(`id`,`order_id`)"+` IN (('1', '2'), (1, 23))`) + }) + t.Run("generateReplaceQuery1", func(t *testing.T) { + stmt := applier.generateReplaceQuery([][]string{{"1", "2"}}) + test.S(t).ExpectEquals(stmt, ` + REPLACE /* gh-ost `+"`test`.`mytable`"+` */ + INTO `+"`test`.`_mytable_gho` (`id`,`order_id`,`name`,`age`)"+` + SELECT `+"`id`,`order_id`,`name`,`age`"+` + FROM `+"`test`.`mytable`"+` + FORCE INDEX `+"(`PRIMARY KEY`)"+` + WHERE (`+"`id`,`order_id`"+`) IN ((1, 2))`) + }) + t.Run("generateReplaceQuery2", func(t *testing.T) { + stmt := applier.generateReplaceQuery([][]string{{"'1'", "'2'"}}) + test.S(t).ExpectEquals(stmt, ` + REPLACE /* gh-ost `+"`test`.`mytable`"+` */ + INTO `+"`test`.`_mytable_gho` (`id`,`order_id`,`name`,`age`)"+` + SELECT `+"`id`,`order_id`,`name`,`age`"+` + FROM `+"`test`.`mytable`"+` + FORCE INDEX `+"(`PRIMARY KEY`)"+` + WHERE (`+"`id`,`order_id`"+`) IN (('1', '2'))`) + }) + t.Run("generateReplaceQuery3", func(t *testing.T) { + stmt := applier.generateReplaceQuery([][]string{{"'1'", "'2'"}, {"1", "23"}}) + test.S(t).ExpectEquals(stmt, ` + REPLACE /* gh-ost `+"`test`.`mytable`"+` */ + INTO `+"`test`.`_mytable_gho` (`id`,`order_id`,`name`,`age`)"+` + SELECT `+"`id`,`order_id`,`name`,`age`"+` + FROM `+"`test`.`mytable`"+` + FORCE INDEX `+"(`PRIMARY KEY`)"+` + WHERE (`+"`id`,`order_id`"+`) IN (('1', '2'), (1, 23))`) + }) +} + +func TestIsIgnoreOverMaxChunkRangeEvent(t *testing.T) { + migrationContext := base.NewMigrationContext() + uniqueColumns := sql.NewColumnList([]string{"id", "date"}) + uniqueColumns.SetColumnCompareValueFunc("id", func(a interface{}, b interface{}) (int, error) { + _a := new(big.Int) + if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a) + } + _b := new(big.Int) + if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b) + } + return _a.Cmp(_b), nil + }) + + uniqueColumns.SetColumnCompareValueFunc("date", func(a interface{}, b interface{}) (int, error) { + _a := new(big.Int) + if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a) + } + _b := new(big.Int) + if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b) + } + return _a.Cmp(_b), nil + }) + + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY KEY", + Columns: *uniqueColumns, + } + migrationContext.MigrationRangeMinValues = sql.ToColumnValues([]interface{}{10, 20240110}) + migrationContext.MigrationRangeMaxValues = sql.ToColumnValues([]interface{}{123456, 20240205}) + migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{11111, 20240103}) + + applier := NewApplier(migrationContext) + + t.Run("setFalse", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = false + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{1, 20240101}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) + + t.Run("lessRangeMaxValue1", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{100, 20240101}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) + + t.Run("lessRangeMaxValue2", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{11111, 20240101}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) + + t.Run("equalRangeMaxValue", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{11111, 20240103}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) + + t.Run("greatRangeMaxValue1", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{11111, 20240104}) + test.S(t).ExpectNil(err) + test.S(t).ExpectTrue(isIgnore) + }) + + t.Run("greatRangeMaxValue2", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{11112, 20240103}) + test.S(t).ExpectNil(err) + test.S(t).ExpectTrue(isIgnore) + }) + + t.Run("lessMaxValue1", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{123456, 20240204}) + test.S(t).ExpectNil(err) + test.S(t).ExpectTrue(isIgnore) + }) + + t.Run("equalMaxValue", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{123456, 20240205}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) + + t.Run("greatMaxValue1", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{123456, 20240207}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) + + t.Run("greatMaxValue2", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{123457, 20240204}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) +} diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 9d414a43e..268c4bb96 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -10,6 +10,7 @@ import ( gosql "database/sql" "errors" "fmt" + "math/big" "reflect" "strings" "sync/atomic" @@ -137,6 +138,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { for i, sharedUniqueKey := range sharedUniqueKeys { this.applyColumnTypes(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &sharedUniqueKey.Columns) uniqueKeyIsValid := true + isMemoryComparable := true for _, column := range sharedUniqueKey.Columns.Columns() { switch column.Type { case sql.FloatColumnType: @@ -152,9 +154,15 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { uniqueKeyIsValid = false } } + if isMemoryComparable && column.FormatValueFunc != nil { + isMemoryComparable = true + } else { + isMemoryComparable = false + } } if uniqueKeyIsValid { this.migrationContext.UniqueKey = sharedUniqueKeys[i] + this.migrationContext.UniqueKey.IsMemoryComparable = isMemoryComparable break } } @@ -598,6 +606,23 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL if strings.Contains(columnType, "unsigned") { column.IsUnsigned = true } + if strings.Contains(columnType, "int") { + column.CompareValueFunc = func(a interface{}, b interface{}) (int, error) { + _a := new(big.Int) + if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a) + } + _b := new(big.Int) + if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b) + } + return _a.Cmp(_b), nil + } + + column.FormatValueFunc = func(a interface{}) (string, error) { + return fmt.Sprintf("%+v", a), nil + } + } if strings.Contains(columnType, "mediumint") { column.Type = sql.MediumIntColumnType } @@ -612,6 +637,9 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL } if strings.Contains(columnType, "float") { column.Type = sql.FloatColumnType + column.FormatValueFunc = func(a interface{}) (string, error) { + return fmt.Sprintf("%+v", a), nil + } } if strings.HasPrefix(columnType, "enum") { column.Type = sql.EnumColumnType diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b4d0a9ae1..5db54ad12 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1025,9 +1025,10 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates() - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Ignored: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), + atomic.LoadInt64(&this.migrationContext.TotalDMLEventsIgnored), len(this.applyEventsQueue), cap(this.applyEventsQueue), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates, diff --git a/go/sql/builder.go b/go/sql/builder.go index 7be428f93..a1bab49e4 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -433,15 +433,15 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey return result, uniqueKeyArgs, nil } -func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList, args []interface{}) (result string, sharedArgs []interface{}, err error) { +func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList, args []interface{}) (result string, sharedArgs, uniqueKeyArgs []interface{}, err error) { if len(args) != tableColumns.Len() { - return result, args, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery") + return result, args, nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery") } if !sharedColumns.IsSubsetOf(tableColumns) { - return result, args, fmt.Errorf("shared columns is not a subset of table columns in BuildDMLInsertQuery") + return result, args, nil, fmt.Errorf("shared columns is not a subset of table columns in BuildDMLInsertQuery") } if sharedColumns.Len() == 0 { - return result, args, fmt.Errorf("No shared columns found in BuildDMLInsertQuery") + return result, args, nil, fmt.Errorf("No shared columns found in BuildDMLInsertQuery") } databaseName = EscapeName(databaseName) tableName = EscapeName(tableName) @@ -452,6 +452,12 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol sharedArgs = append(sharedArgs, arg) } + for _, column := range uniqueKeyColumns.Columns() { + tableOrdinal := tableColumns.Ordinals[column.Name] + arg := column.convertArg(args[tableOrdinal], true) + uniqueKeyArgs = append(uniqueKeyArgs, arg) + } + mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names()) for i := range mappedSharedColumnNames { mappedSharedColumnNames[i] = EscapeName(mappedSharedColumnNames[i]) @@ -470,7 +476,7 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol strings.Join(mappedSharedColumnNames, ", "), strings.Join(preparedValues, ", "), ) - return result, sharedArgs, nil + return result, sharedArgs, uniqueKeyArgs, nil } func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList, valueArgs, whereArgs []interface{}) (result string, sharedArgs, uniqueKeyArgs []interface{}, err error) { diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 574e8bb1b..dd49e7f29 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -491,7 +491,8 @@ func TestBuildDMLInsertQuery(t *testing.T) { args := []interface{}{3, "testname", "first", 17, 23} { sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) - query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{"position"}) + query, sharedArgs, uniqueKeyArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNil(err) expected := ` replace /* gh-ost mydb.tbl */ @@ -502,10 +503,12 @@ func TestBuildDMLInsertQuery(t *testing.T) { ` test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", 17, 23})) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{17})) } { sharedColumns := NewColumnList([]string{"position", "name", "age", "id"}) - query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{"position", "name"}) + query, sharedArgs, uniqueKeyArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNil(err) expected := ` replace /* gh-ost mydb.tbl */ @@ -516,15 +519,18 @@ func TestBuildDMLInsertQuery(t *testing.T) { ` test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{17, "testname", 23, 3})) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{17, "testname"})) } { sharedColumns := NewColumnList([]string{"position", "name", "surprise", "id"}) - _, _, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{"id"}) + _, _, _, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNotNil(err) } { sharedColumns := NewColumnList([]string{}) - _, _, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{}) + _, _, _, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNotNil(err) } } @@ -538,7 +544,8 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { // testing signed args := []interface{}{3, "testname", "first", int8(-1), 23} sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) - query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{"position"}) + query, sharedArgs, uniqueKeyArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNil(err) expected := ` replace /* gh-ost mydb.tbl */ @@ -549,12 +556,14 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { ` test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", int8(-1), 23})) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{int8(-1)})) } { // testing unsigned args := []interface{}{3, "testname", "first", int8(-1), 23} sharedColumns.SetUnsigned("position") - query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{"position"}) + query, sharedArgs, uniqueKeyArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNil(err) expected := ` replace /* gh-ost mydb.tbl */ @@ -565,12 +574,14 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { ` test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", uint8(255), 23})) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint8(255)})) } { // testing unsigned args := []interface{}{3, "testname", "first", int32(-1), 23} sharedColumns.SetUnsigned("position") - query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{"position"}) + query, sharedArgs, uniqueKeyArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNil(err) expected := ` replace /* gh-ost mydb.tbl */ @@ -581,6 +592,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { ` test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", uint32(4294967295), 23})) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint32(4294967295)})) } } diff --git a/go/sql/types.go b/go/sql/types.go index 3be1a44ca..a25efde85 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -49,6 +49,9 @@ type Column struct { // https://github.com/github/gh-ost/issues/909 BinaryOctetLength uint charsetConversion *CharacterSetConversion + // compare a and b using this function, when a equal b, return 0, when a > b, return 1, when a < b, return -1 + CompareValueFunc func(a interface{}, b interface{}) (int, error) + FormatValueFunc func(a interface{}) (string, error) } func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} { @@ -225,6 +228,14 @@ func (this *ColumnList) String() string { return strings.Join(this.Names(), ",") } +func (this *ColumnList) EscapeString() string { + var cols []string + for _, col := range this.Names() { + cols = append(cols, fmt.Sprintf("`%s`", col)) + } + return strings.Join(cols, ",") +} + func (this *ColumnList) Equals(other *ColumnList) bool { return reflect.DeepEqual(this.Columns, other.Columns) } @@ -252,12 +263,21 @@ func (this *ColumnList) SetCharsetConversion(columnName string, fromCharset stri this.GetColumn(columnName).charsetConversion = &CharacterSetConversion{FromCharset: fromCharset, ToCharset: toCharset} } +func (this *ColumnList) SetColumnCompareValueFunc(columnName string, f func(a interface{}, b interface{}) (int, error)) { + this.GetColumn(columnName).CompareValueFunc = f +} + +func (this *ColumnList) GetColumnCompareValueFunc(columnName string) func(a interface{}, b interface{}) (int, error) { + return this.GetColumn(columnName).CompareValueFunc +} + // UniqueKey is the combination of a key's name and columns type UniqueKey struct { - Name string - Columns ColumnList - HasNullable bool - IsAutoIncrement bool + Name string + Columns ColumnList + HasNullable bool + IsAutoIncrement bool + IsMemoryComparable bool } // IsPrimary checks if this unique key is primary @@ -277,6 +297,21 @@ func (this *UniqueKey) String() string { return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.Names(), this.HasNullable) } +func (this *UniqueKey) FormatValues(args []interface{}) ([]string, error) { + var values []string + for i, column := range this.Columns.Columns() { + if column.FormatValueFunc == nil { + return nil, fmt.Errorf("column %s does not support format value", column.Name) + } + val, err := column.FormatValueFunc(args[i]) + if err != nil { + return nil, err + } + values = append(values, val) + } + return values, nil +} + type ColumnValues struct { abstractValues []interface{} ValuesPointers []interface{}