From cc08784302d38b65e09276f1e2e34686a60fee44 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 20 Feb 2024 23:20:13 -0500 Subject: [PATCH] Fixes Signed-off-by: Matt Lord --- go/mysql/binlog_event.go | 3 +- go/mysql/binlog_event_rbr.go | 46 ++++++++++--------- .../tabletserver/repltracker/reader.go | 3 +- .../vttablet/tabletserver/schema/historian.go | 4 +- go/vt/vttablet/tabletserver/schema/tracker.go | 10 ++-- .../vstreamer/helper_event_test.go | 4 +- .../tabletserver/vstreamer/vstreamer.go | 20 +++++++- .../tabletserver/vstreamer/vstreamer_test.go | 9 ++-- 8 files changed, 58 insertions(+), 41 deletions(-) diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go index f2338ce9d6e..07407766e57 100644 --- a/go/mysql/binlog_event.go +++ b/go/mysql/binlog_event.go @@ -19,6 +19,7 @@ package mysql import ( "fmt" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/replication" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -222,7 +223,7 @@ type TableMap struct { // override for character based columns ONLY. This means that the // array position needs to be mapped to the ordered list of // character based columns in the table. - ColumnCollationIDs []uint64 + ColumnCollationIDs []collations.ID } // Rows contains data from a {WRITE,UPDATE,DELETE}_ROWS_EVENT. diff --git a/go/mysql/binlog_event_rbr.go b/go/mysql/binlog_event_rbr.go index 19e1285e0e0..2065274a7b3 100644 --- a/go/mysql/binlog_event_rbr.go +++ b/go/mysql/binlog_event_rbr.go @@ -20,12 +20,11 @@ import ( "encoding/binary" "vitess.io/vitess/go/mysql/binlog" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/vtrpc" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/vt/vterrors" querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // These are the TABLE_MAP_EVENT's optional metadata field types from: libbinlogevents/include/rows_event.h @@ -84,7 +83,7 @@ func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) { columnCount, read, ok := readLenEncInt(data, pos) if !ok { - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data) } pos = read @@ -93,7 +92,7 @@ func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) { metaLen, read, ok := readLenEncInt(data, pos) if !ok { - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected metadata length at position %v (data=%v)", pos, data) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected metadata length at position %v (data=%v)", pos, data) } pos = read @@ -108,21 +107,21 @@ func (ev binlogEvent) TableMap(f BinlogFormat) (*TableMap, error) { } } if pos != expectedEnd { - return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected metadata end: got %v was expecting %v (data=%v)", pos, expectedEnd, data) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected metadata end: got %v was expecting %v (data=%v)", pos, expectedEnd, data) } // A bit array that says if each column can be NULL. result.CanBeNull, _ = newBitmap(data, pos, int(columnCount)) - //log.Errorf("DEBUG: Remining bytes for %s: %v", result.Name, data[pos:]) + //log.Errorf("DEBUG: Remaining optional metadata bytes for %s: %v", result.Name, data[pos:]) - // Read any CHAR based column collation overrides in the optional metadata. + // Read any text based column collation values provided in the optional metadata. var err error if result.ColumnCollationIDs, err = readColumnCollationIDs(data, pos); err != nil { return nil, err } - log.Errorf("DEBUG: table %s; ColumnCollationIDs: %+v", result.Name, result.ColumnCollationIDs) + //log.Errorf("DEBUG: table %s; ColumnCollationIDs: %+v", result.Name, result.ColumnCollationIDs) return result, nil } @@ -148,7 +147,7 @@ func metadataLength(typ byte) int { default: // Unknown type. This is used in tests only, so panic. - panic(vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataLength: unhandled data type: %v", typ)) + panic(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataLength: unhandled data type: %v", typ)) } } @@ -184,7 +183,7 @@ func metadataRead(data []byte, pos int, typ byte) (uint16, int, error) { default: // Unknown types, we can't go on. - return 0, 0, vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ) + return 0, 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ) } } @@ -215,7 +214,7 @@ func metadataWrite(data []byte, pos int, typ byte, value uint16) int { default: // Unknown type. This is used in tests only, so panic. - panic(vterrors.Errorf(vtrpc.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ)) + panic(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "metadataRead: unhandled data type: %v", typ)) } } @@ -226,18 +225,23 @@ func metadataWrite(data []byte, pos int, typ byte, value uint16) int { // and the table definition. // We only care about the collation IDs of the character based columns and // this info is provided in all binlog_row_metadata formats. -func readColumnCollationIDs(data []byte, pos int) ([]uint64, error) { - collationIDs := make([]uint64, 0) +func readColumnCollationIDs(data []byte, pos int) ([]collations.ID, error) { + // Heurestic allocation of the slice's backing array. + collationIDs := make([]collations.ID, 0, len(data[pos:])-3) for pos < len(data) { fieldType := uint64(data[pos]) pos++ + if fieldType == 254 { // I don't know WTFudge this is yet... but the payload then seems invalid + return nil, nil + } + if fieldType == 0 { // Null byte separator continue } fieldLen, read, ok := readLenEncInt(data, pos) - if !ok { + if !ok || read+int(fieldLen) > len(data) { return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "error reading optional metadata field length") } pos = read @@ -247,14 +251,14 @@ func readColumnCollationIDs(data []byte, pos int) ([]uint64, error) { //log.Errorf("DEBUG: Optional Metadata Field Type: %v, Length: %v, Value: %v", fieldType, fieldLen, fieldVal) if fieldType == TableMapDefaultCharset || fieldType == TableMapColumnCharset { // It's one or the other - //log.Errorf("DEBUG: Number of charset bytes : %d", fieldLen) for i := uint64(0); i < fieldLen; i++ { - v := uint64(fieldVal[i]) - if v == 0 || v == 252 { // Ignore Null or unimplemented, respectively - continue + v := uint16(fieldVal[i]) + if v == 252 { // The ID is the subsequent 2 bytes + v = binary.LittleEndian.Uint16(fieldVal[i+1 : i+3]) + i += 2 } + collationIDs = append(collationIDs, collations.ID(v)) //log.Errorf("DEBUG: charset idx %d: %v", i, v) - collationIDs = append(collationIDs, v) } } } @@ -307,7 +311,7 @@ func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) { columnCount, read, ok := readLenEncInt(data, pos) if !ok { - return result, vterrors.Errorf(vtrpc.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data) + return result, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected column count at position %v (data=%v)", pos, data) } pos = read diff --git a/go/vt/vttablet/tabletserver/repltracker/reader.go b/go/vt/vttablet/tabletserver/repltracker/reader.go index 694778d1119..b50e5e4b2c7 100644 --- a/go/vt/vttablet/tabletserver/repltracker/reader.go +++ b/go/vt/vttablet/tabletserver/repltracker/reader.go @@ -23,13 +23,12 @@ import ( "time" "vitess.io/vitess/go/constants/sidecar" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" diff --git a/go/vt/vttablet/tabletserver/schema/historian.go b/go/vt/vttablet/tabletserver/schema/historian.go index b65ab514585..ca57f6d43e0 100644 --- a/go/vt/vttablet/tabletserver/schema/historian.go +++ b/go/vt/vttablet/tabletserver/schema/historian.go @@ -26,11 +26,11 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - "vitess.io/vitess/go/vt/sqlparser" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) const getInitialSchemaVersions = "select id, pos, ddl, time_updated, schemax from %s.schema_version where time_updated > %d order by id asc" diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index bce5e4b33d6..8db202efa13 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -25,18 +25,18 @@ import ( "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) -// VStreamer defines the functions of VStreamer +// VStreamer defines the functions of VStreamer // that the replicationWatcher needs. type VStreamer interface { Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error diff --git a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go index 9a4717386d9..3e57bc45952 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go @@ -73,7 +73,6 @@ type TestColumn struct { len int64 dataTypeLowered string skip bool - collationName string } // TestFieldEvent has all the attributes of a table required for creating a field event. @@ -377,6 +376,7 @@ func (ts *TestSpec) getFieldEvent(table *schemadiff.CreateTableEntity) *TestFiel tc.collationID = testenv.DefaultCollationID } else { tc.collationID = collations.MySQL8().LookupByName(collationName) + log.Errorf("DEBUG: SchemaDiff provided collation for %s.%s: %s:%d", tfe.table, tc.name, collationName, tc.collationID) } collation := colldata.Lookup(tc.collationID) switch tc.dataTypeLowered { @@ -395,7 +395,6 @@ func (ts *TestSpec) getFieldEvent(table *schemadiff.CreateTableEntity) *TestFiel if tc.dataTypeLowered == "char" && collation.IsBinary() { tc.dataType = "BINARY" } - tc.collationID = testenv.DefaultCollationID } tc.colType = fmt.Sprintf("%s(%d)", tc.dataTypeLowered, l) case "blob": @@ -405,7 +404,6 @@ func (ts *TestSpec) getFieldEvent(table *schemadiff.CreateTableEntity) *TestFiel case "text": tc.len = lengthText tc.colType = "text" - tc.collationID = testenv.DefaultCollationID case "set": tc.len = lengthSet tc.colType = fmt.Sprintf("%s(%s)", tc.dataTypeLowered, strings.Join(col.Type.EnumValues, ",")) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 9c63f8a499c..cbd0466fe2f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -733,7 +733,6 @@ func (vs *vstreamer) buildTablePlan(id uint64, tm *mysql.TableMap) (*binlogdatap if err != nil { return nil, err } - table := &Table{ Name: tm.Name, Fields: cols, @@ -763,12 +762,21 @@ func (vs *vstreamer) buildTablePlan(id uint64, tm *mysql.TableMap) (*binlogdatap func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, error) { var fields []*querypb.Field + var charFieldIdx int for i, typ := range tm.Types { t, err := sqltypes.MySQLToType(typ, 0) if err != nil { return nil, fmt.Errorf("unsupported type: %d, position: %d", typ, i) } - coll := collations.CollationForType(t, vs.se.Environment().CollationEnv().DefaultConnectionCharset()) + // Use the the collation inherited or the one specified specifically + // for the column if one was provided in the event's metadata. + var coll collations.ID + if sqltypes.IsText(t) && len(tm.ColumnCollationIDs) > charFieldIdx { + coll = tm.ColumnCollationIDs[charFieldIdx] + charFieldIdx++ + } else { // Use the server defined default for the column's type + coll = collations.CollationForType(t, vs.se.Environment().CollationEnv().DefaultConnectionCharset()) + } fields = append(fields, &querypb.Field{ Name: fmt.Sprintf("@%d", i+1), Type: t, @@ -805,6 +813,14 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er if err != nil { return nil, err } + // This uses the historian which queries the columns in the table and uses the + // generated fields. This means that the fields are using collations for the + // column types based on the *connection collation* and not the actual + // *column collation*. + // So we copy the collation information from the actual TableMap here. + for i := range fieldsCopy { + fieldsCopy[i].Charset = fields[i].Charset + } return fieldsCopy, nil } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index a05ca9bd149..9f225de51d6 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -156,7 +156,7 @@ func TestCellValuePadding(t *testing.T) { ddls: []string{ "create table t1(id int, val binary(4), primary key(val))", "create table t2(id int, val char(4), primary key(val))", - "create table t3(id int, val char(4) collate utf8mb4_bin, primary key(val))"}, + "create table t3(id int, txt text, val char(4) collate utf8mb4_bin, val2 varchar(64) collate utf8mb4_general_ci, val3 varchar(255) collate utf8mb4_ja_0900_as_cs, primary key(val))"}, } defer ts.Close() require.NoError(t, ts.Init()) @@ -172,10 +172,10 @@ func TestCellValuePadding(t *testing.T) { {"update t2 set id = 11 where val = 'aaa'", []TestRowEvent{ {spec: &TestRowEventSpec{table: "t2", changes: []TestRowChange{{before: []string{"1", "aaa"}, after: []string{"11", "aaa"}}}}}, }}, - {"insert into t3 values (1, 'aaa')", nil}, - {"insert into t3 values (2, 'bb')", nil}, + {"insert into t3 values (1, 'aaa', 'aaa', 'aaa', 'aaa')", nil}, + {"insert into t3 values (2, 'bb', 'bb', 'bb', 'bb')", nil}, {"update t3 set id = 11 where val = 'aaa'", []TestRowEvent{ - {spec: &TestRowEventSpec{table: "t3", changes: []TestRowChange{{before: []string{"1", "aaa"}, after: []string{"11", "aaa"}}}}}, + {spec: &TestRowEventSpec{table: "t3", changes: []TestRowChange{{before: []string{"1", "aaa", "aaa", "aaa", "aaa"}, after: []string{"11", "aaa", "aaa", "aaa", "aaa"}}}}}, }}, {"commit", nil}, }} @@ -2097,7 +2097,6 @@ func TestGeneratedInvisiblePrimaryKey(t *testing.T) { } func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string, tablePK []*binlogdatapb.TableLastPK) { - ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg, ch := startStream(ctx, t, filter, position, tablePK)