From 2c49948a55b7b3549d3c03ae4a21fdacc0d0b0eb Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 12 Dec 2024 13:27:33 -0500 Subject: [PATCH] Properly handle PK changes Signed-off-by: Matt Lord --- go/mysql/binlog/binlog_json.go | 10 +- .../vreplication/vreplication_test.go | 1 + .../vreplication/replicator_plan.go | 105 ++++++++++++++---- .../vreplication/table_plan_partial.go | 4 + .../vreplication/vplayer_flaky_test.go | 59 ++++++++++ 5 files changed, 157 insertions(+), 22 deletions(-) diff --git a/go/mysql/binlog/binlog_json.go b/go/mysql/binlog/binlog_json.go index e3ca52096f2..8db78edfd23 100644 --- a/go/mysql/binlog/binlog_json.go +++ b/go/mysql/binlog/binlog_json.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/mysql/format" "vitess.io/vitess/go/mysql/json" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" querypb "vitess.io/vitess/go/vt/proto/query" @@ -110,7 +111,7 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { diff.WriteString(innerStr) diff.WriteString(", ") } else { // Only the inner most function has the field name - diff.WriteString("`%s`, ") // This will later be replaced by the field name + diff.WriteString("%s, ") // This will later be replaced by the field name } pathLen, readTo := readVariableLength(data, pos) @@ -120,7 +121,10 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { // We have to specify the unicode character set for the strings we // use in the expression as the connection can be using a different // character set (e.g. vreplication always uses set names binary). - diff.WriteString(fmt.Sprintf("_utf8mb4'%s'", path)) + diff.WriteString(sqlparser.Utf8mb4Str) + diff.WriteByte('\'') + diff.Write(path) + diff.WriteByte('\'') if opType == jsonDiffOpRemove { // No value for remove diff.WriteString(")") @@ -135,7 +139,7 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { } pos += valueLen if value.Type() == json.TypeString { - diff.WriteString("_utf8mb4") + diff.WriteString(sqlparser.Utf8mb4Str) } diff.WriteString(fmt.Sprintf("%s)", value)) } diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index a6da24ef8fc..4269bd68cab 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -732,6 +732,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j3 = JSON_SET(JSON_REMOVE(JSON_REPLACE(j3, '$.day', 'tuesday'), '$.favorite_color'), '$.hobby', 'skiing') where id = 4") execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j3 = JSON_SET(JSON_SET(j3, '$.salary', 110), '$.role', 'IC') where id = 4") execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j3 = JSON_SET(j3, '$.misc', '{\"address\":\"1012 S Park St\", \"town\":\"Hastings\", \"state\":\"MI\"}') where id = 1") + execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set id=id+1000, j3=JSON_SET(j3, '$.day', 'friday')") waitForNoWorkflowLag(t, vc, targetKs, workflow) dec80Replicated := false for _, tablet := range []*cluster.VttabletProcess{customerTab1, customerTab2} { diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 206a1d9f037..697d6c74891 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -17,6 +17,7 @@ limitations under the License. package vreplication import ( + "bytes" "encoding/json" "fmt" "slices" @@ -30,6 +31,7 @@ import ( vjson "vitess.io/vitess/go/mysql/json" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/ptr" + "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/sqlparser" @@ -383,30 +385,31 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun after = true vals := sqltypes.MakeRowTrusted(tp.Fields, rowChange.After) for i, field := range tp.Fields { - var bindVar *querypb.BindVariable - var newVal *sqltypes.Value - var err error + var ( + bindVar *querypb.BindVariable + newVal *sqltypes.Value + err error + ) if field.Type == querypb.Type_JSON { switch { case vals[i].IsNull(): // An SQL NULL and not an actual JSON value newVal = &sqltypes.NULL case rowChange.JsonPartialValues != nil && isBitSet(rowChange.JsonPartialValues.Cols, jsonIndex) && !slices.Equal(vals[i].Raw(), sqltypes.NullBytes): - // An SQL expression that can be converted to a JSON value such - // as JSON_INSERT(). - // This occurs when using partial JSON values as a result of - // mysqld using binlog-row-value-options=PARTIAL_JSON. + // An SQL expression that can be converted to a JSON value such as JSON_INSERT(). + // This occurs when using partial JSON values as a result of mysqld using + // binlog-row-value-options=PARTIAL_JSON. if len(vals[i].Raw()) == 0 { // When using BOTH binlog_row_image=NOBLOB AND - // binlog_row_value_options=PARTIAL_JSON then the JSON - // column has the data bit set and the diff is empty. So - // we have to account for this by unsetting the data bit - // so that the current JSON value is not overwritten. + // binlog_row_value_options=PARTIAL_JSON then the JSON column has the data bit + // set and the diff is empty when it's not present. So we have to account for + // this by unsetting the data bit so that the current JSON value is not lost. setBit(rowChange.DataColumns.Cols, i, false) newVal = ptr.Of(sqltypes.MakeTrusted(querypb.Type_EXPRESSION, nil)) } else { + escapedName := sqlescape.EscapeID(field.Name) newVal = ptr.Of(sqltypes.MakeTrusted(querypb.Type_EXPRESSION, []byte( - fmt.Sprintf(vals[i].RawStr(), field.Name), + fmt.Sprintf(vals[i].RawStr(), escapedName), ))) } default: // A JSON value (which may be a JSON null literal value) @@ -428,7 +431,7 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun } switch { case !before && after: - // only apply inserts for rows whose primary keys are within the range of rows already copied + // Only apply inserts for rows whose primary keys are within the range of rows already copied. if tp.isOutsidePKRange(bindvars, before, after, "insert") { return nil, nil } @@ -465,15 +468,79 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun return nil, err } } - // TODO: the INSERTs done here after deleting the row with the original PK - // need to use the values from the BEFORE image for the columns NOT present - // in the AFTER image due to being a partial image due to the source's usage - // of binlog-row-image=NOBLOB. - // For JSON columns when binlog-row-value-options=PARTIAL_JSON is used, we - // need to wrap the JSON diff function(s) around the BEFORE value. if tp.isOutsidePKRange(bindvars, before, after, "insert") { return nil, nil } + if tp.isPartial(rowChange) { + // We need to use a combination of the values in the BEFORE and AFTER image to generate + // the new row. + vals := sqltypes.MakeRowTrusted(tp.Fields, rowChange.After) + jsonIndex := 0 + for i, field := range tp.Fields { + if field.Type == querypb.Type_JSON && rowChange.JsonPartialValues != nil { + if isBitSet(rowChange.JsonPartialValues.Cols, jsonIndex) { + if len(vals[i].Raw()) == 0 { + // When using BOTH binlog_row_image=NOBLOB AND + // binlog_row_value_options=PARTIAL_JSON then the JSON column has the data bit + // set and the diff is empty when it's not present. So we want to use the + // BEFORE image value. + if bindvars["b_"+field.Name] == nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, + "binary log event missing a needed value for %s.%s due to the usage of binlog-row-image=NOBLOB; you will need to re-run the workflow with binlog-row-image=FULL", + tp.TargetName, field.Name) + } + beforeVal, err := vjson.MarshalSQLValue(bindvars["b_"+field.Name].Value) + if err != nil { + return nil, vterrors.Wrapf(err, "failed to convert JSON to SQL field value for %s.%s when building insert query", + tp.TargetName, field.Name) + } + bindvars["a_"+field.Name], err = tp.bindFieldVal(field, beforeVal) + if err != nil { + return nil, vterrors.Wrapf(err, "failed to bind field value for %s.%s when building insert query", + tp.TargetName, field.Name) + } + } else { + // For JSON columns when binlog-row-value-options=PARTIAL_JSON is used, we + // need to wrap the JSON diff function(s) around the BEFORE value. + diff := bindvars["a_"+field.Name].Value + if bindvars["b_"+field.Name] == nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, + "binary log event missing a needed value for %s.%s due to the usage of binlog-row-value-options=PARTIAL_JSON; you will need to re-run the workflow without this option enabled", + tp.TargetName, field.Name) + } + beforeVal := bindvars["b_"+field.Name].Value + afterVal := bytes.Buffer{} + afterVal.Grow(len(diff) + len(beforeVal) + len(sqlparser.Utf8mb4Str) + 2) // +2 is for the enclosing quotes + // If the JSON column is partial, we need to specify the BEFORE value as + // the input for the diff function(s). + afterVal.WriteString(sqlparser.Utf8mb4Str) + afterVal.WriteByte('\'') + afterVal.Write(beforeVal) + afterVal.WriteByte('\'') + newVal := sqltypes.MakeTrusted(querypb.Type_EXPRESSION, []byte( + fmt.Sprintf(vals[i].RawStr(), afterVal.String()), + )) + bindVar, err := tp.bindFieldVal(field, &newVal) + if err != nil { + return nil, vterrors.Wrapf(err, "failed to bind field value for %s.%s when building insert query", + tp.TargetName, field.Name) + } + bindvars["a_"+field.Name] = bindVar + } + } + jsonIndex++ + } + if bindvars["a_"+field.Name] == nil { + if bindvars["b_"+field.Name] == nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, + "binary log event missing a needed value for %s.%s due to the usage of binlog-row-image=NOBLOB; you will need to re-run the workflow with binlog-row-image=FULL", + tp.TargetName, field.Name) + } + // Use the BEFORE image value for the new row. + bindvars["a_"+field.Name] = bindvars["b_"+field.Name] + } + } + } return execParsedQuery(tp.Insert, bindvars, executor) } // Unreachable. diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go index a4e177a9f14..ea2fc563a1e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go @@ -208,3 +208,7 @@ func (tp *TablePlan) getPartialUpdateQuery(dataColumns *binlogdatapb.RowChange_B tp.Stats.PartialQueryCacheSize.Add([]string{"update"}, 1) return upd, nil } + +func (tp *TablePlan) getInsertForPartialRow(dataColumns *binlogdatapb.RowChange_Bitmap, jsonPartialValues *binlogdatapb.RowChange_Bitmap) (*sqlparser.ParsedQuery, error) { + return nil, nil +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 50d93e60e5a..835f066e7f6 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -1519,6 +1519,65 @@ func TestPlayerRowMove(t *testing.T) { validateQueryCountStat(t, "replicate", 3) } +func TestPlayerUpdatePK(t *testing.T) { + defer deleteTablet(addTablet(100)) + + execStatements(t, []string{ + "create table src(id int, bd blob, jd json, primary key(id))", + fmt.Sprintf("create table %s.dst(id int, bd blob, jd json, primary key(id))", vrepldb), + }) + defer execStatements(t, []string{ + "drop table src", + fmt.Sprintf("drop table %s.dst", vrepldb), + }) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "dst", + Filter: "select * from src", + }}, + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + OnDdl: binlogdatapb.OnDDLAction_IGNORE, + } + cancel, _ := startVReplication(t, bls, "") + defer cancel() + + execStatements(t, []string{ + "insert into src values(1, 'blob data', _utf8mb4'{\"key1\":\"val1\"}'), (2, 'blob data2', _utf8mb4'{\"key2\":\"val2\"}'), (3, 'blob data3', _utf8mb4'{\"key3\":\"val3\"}')", + }) + expectDBClientQueries(t, qh.Expect( + "begin", + "insert into dst(id,bd,jd) values (1,_binary'blob data','{\"key1\": \"val1\"}'), (2,_binary'blob data2','{\"key2\": \"val2\"}'), (3,_binary'blob data3','{\"key3\": \"val3\"}')", + "/update _vt.vreplication set pos=", + "commit", + )) + expectData(t, "dst", [][]string{ + {"1", "1", "1"}, + {"2", "5", "2"}, + }) + validateQueryCountStat(t, "replicate", 1) + + execStatements(t, []string{ + "update src set val1=1, val2=4 where id=3", + }) + expectDBClientQueries(t, qh.Expect( + "begin", + "update dst set sval2=sval2-ifnull(3, 0), rcount=rcount-1 where val1=2", + "insert into dst(val1,sval2,rcount) values (1,ifnull(4, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", + "/update _vt.vreplication set pos=", + "commit", + )) + expectData(t, "dst", [][]string{ + {"1", "5", "2"}, + {"2", "2", "1"}, + }) + validateQueryCountStat(t, "replicate", 3) +} + func TestPlayerTypes(t *testing.T) { defer deleteTablet(addTablet(100)) execStatements(t, []string{